HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1563362 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-02-01 02:25:33 +00:00
parent 6e8c1bf200
commit 140246bad8
16 changed files with 1846 additions and 136 deletions

View File

@ -425,9 +425,9 @@ checkJavadocWarnings () {
echo ""
echo "There appear to be $javadocWarnings javadoc warnings generated by the patched build."
#There are 12 warnings that are caused by things that are caused by using sun internal APIs.
#There are 14 warnings that are caused by things that are caused by using sun internal APIs.
#There are 2 warnings that are caused by the Apache DS Dn class used in MiniKdc.
OK_JAVADOC_WARNINGS=14;
OK_JAVADOC_WARNINGS=16;
### if current warnings greater than OK_JAVADOC_WARNINGS
if [[ $javadocWarnings -ne $OK_JAVADOC_WARNINGS ]] ; then
JIRA_COMMENT="$JIRA_COMMENT

View File

@ -543,6 +543,7 @@
<javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.nativeio.NativeIO</javahClassName>
<javahClassName>org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory</javahClassName>
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
@ -550,6 +551,7 @@
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
<javahClassName>org.apache.hadoop.net.unix.DomainSocket</javahClassName>
<javahClassName>org.apache.hadoop.net.unix.DomainSocketWatcher</javahClassName>
</javahClassNames>
<javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
</configuration>

View File

@ -178,7 +178,9 @@ add_dual_library(hadoop
${D}/io/nativeio/NativeIO.c
${D}/io/nativeio/errno_enum.c
${D}/io/nativeio/file_descriptor.c
${D}/io/nativeio/SharedFileDescriptorFactory.c
${D}/net/unix/DomainSocket.c
${D}/net/unix/DomainSocketWatcher.c
${D}/security/JniBasedUnixGroupsMapping.c
${D}/security/JniBasedUnixGroupsNetgroupMapping.c
${D}/security/hadoop_group_info.c

View File

@ -487,6 +487,16 @@ public CachedName(String name, long timestamp) {
new ConcurrentHashMap<Integer, CachedName>();
private enum IdCache { USER, GROUP }
public final static int MMAP_PROT_READ = 0x1;
public final static int MMAP_PROT_WRITE = 0x2;
public final static int MMAP_PROT_EXEC = 0x4;
public static native long mmap(FileDescriptor fd, int prot,
boolean shared, long length) throws IOException;
public static native void munmap(long addr, long length)
throws IOException;
}
private static boolean workaroundNonThreadSafePasswdCalls = false;

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.nativeio;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileDescriptor;
import org.apache.commons.lang.SystemUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
/**
* A factory for creating shared file descriptors inside a given directory.
* Typically, the directory will be /dev/shm or /tmp.
*
* We will hand out file descriptors that correspond to unlinked files residing
* in that directory. These file descriptors are suitable for sharing across
* multiple processes and are both readable and writable.
*
* Because we unlink the temporary files right after creating them, a JVM crash
* usually does not leave behind any temporary files in the directory. However,
* it may happen that we crash right after creating the file and before
* unlinking it. In the constructor, we attempt to clean up after any such
* remnants by trying to unlink any temporary files created by previous
* SharedFileDescriptorFactory instances that also used our prefix.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SharedFileDescriptorFactory {
private final String prefix;
private final String path;
/**
* Create a SharedFileDescriptorFactory.
*
* @param prefix Prefix to add to all file names we use.
* @param path Path to use.
*/
public SharedFileDescriptorFactory(String prefix, String path)
throws IOException {
Preconditions.checkArgument(NativeIO.isAvailable());
Preconditions.checkArgument(SystemUtils.IS_OS_UNIX);
this.prefix = prefix;
this.path = path;
deleteStaleTemporaryFiles0(prefix, path);
}
/**
* Create a shared file descriptor which will be both readable and writable.
*
* @param length The starting file length.
*
* @return The file descriptor, wrapped in a FileInputStream.
* @throws IOException If there was an I/O or configuration error creating
* the descriptor.
*/
public FileInputStream createDescriptor(int length) throws IOException {
return new FileInputStream(createDescriptor0(prefix, path, length));
}
/**
* Delete temporary files in the directory, NOT following symlinks.
*/
private static native void deleteStaleTemporaryFiles0(String prefix,
String path) throws IOException;
/**
* Create a file with O_EXCL, and then resize it to the desired size.
*/
private static native FileDescriptor createDescriptor0(String prefix,
String path, int length) throws IOException;
}

View File

@ -24,17 +24,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.CloseableReferenceCount;
import com.google.common.annotations.VisibleForTesting;
@ -132,104 +130,14 @@ public static String getEffectivePath(String path, int port) {
}
/**
* Tracks the reference count of the file descriptor, and also whether it is
* open or closed.
* The socket reference count and closed bit.
*/
private static class Status {
/**
* Bit mask representing a closed domain socket.
*/
private static final int STATUS_CLOSED_MASK = 1 << 30;
/**
* Status bits
*
* Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed
* Bits 29 to 0: the reference count.
*/
private final AtomicInteger bits = new AtomicInteger(0);
Status() { }
/**
* Increment the reference count of the underlying file descriptor.
*
* @throws ClosedChannelException If the file descriptor is closed.
*/
void reference() throws ClosedChannelException {
int curBits = bits.incrementAndGet();
if ((curBits & STATUS_CLOSED_MASK) != 0) {
bits.decrementAndGet();
throw new ClosedChannelException();
}
}
/**
* Decrement the reference count of the underlying file descriptor.
*
* @param checkClosed Whether to throw an exception if the file
* descriptor is closed.
*
* @throws AsynchronousCloseException If the file descriptor is closed and
* checkClosed is set.
*/
void unreference(boolean checkClosed) throws AsynchronousCloseException {
int newCount = bits.decrementAndGet();
assert (newCount & ~STATUS_CLOSED_MASK) >= 0;
if (checkClosed && ((newCount & STATUS_CLOSED_MASK) != 0)) {
throw new AsynchronousCloseException();
}
}
/**
* Return true if the file descriptor is currently open.
*
* @return True if the file descriptor is currently open.
*/
boolean isOpen() {
return ((bits.get() & STATUS_CLOSED_MASK) == 0);
}
/**
* Mark the file descriptor as closed.
*
* Once the file descriptor is closed, it cannot be reopened.
*
* @return The current reference count.
* @throws ClosedChannelException If someone else closes the file
* descriptor before we do.
*/
int setClosed() throws ClosedChannelException {
while (true) {
int curBits = bits.get();
if ((curBits & STATUS_CLOSED_MASK) != 0) {
throw new ClosedChannelException();
}
if (bits.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
return curBits & (~STATUS_CLOSED_MASK);
}
}
}
/**
* Get the current reference count.
*
* @return The current reference count.
*/
int getReferenceCount() {
return bits.get() & (~STATUS_CLOSED_MASK);
}
}
/**
* The socket status.
*/
private final Status status;
final CloseableReferenceCount refCount;
/**
* The file descriptor associated with this UNIX domain socket.
*/
private final int fd;
final int fd;
/**
* The path associated with this UNIX domain socket.
@ -252,13 +160,21 @@ int getReferenceCount() {
private final DomainChannel channel = new DomainChannel();
private DomainSocket(String path, int fd) {
this.status = new Status();
this.refCount = new CloseableReferenceCount();
this.fd = fd;
this.path = path;
}
private static native int bind0(String path) throws IOException;
private void unreference(boolean checkClosed) throws ClosedChannelException {
if (checkClosed) {
refCount.unreferenceCheckClosed();
} else {
refCount.unreference();
}
}
/**
* Create a new DomainSocket listening on the given path.
*
@ -308,14 +224,14 @@ public static DomainSocket[] socketpair() throws IOException {
* @throws SocketTimeoutException If the accept timed out.
*/
public DomainSocket accept() throws IOException {
status.reference();
refCount.reference();
boolean exc = true;
try {
DomainSocket ret = new DomainSocket(path, accept0(fd));
exc = false;
return ret;
} finally {
status.unreference(exc);
unreference(exc);
}
}
@ -335,14 +251,14 @@ public static DomainSocket connect(String path) throws IOException {
return new DomainSocket(path, fd);
}
/**
* Return true if the file descriptor is currently open.
*
* @return True if the file descriptor is currently open.
*/
public boolean isOpen() {
return status.isOpen();
}
/**
* Return true if the file descriptor is currently open.
*
* @return True if the file descriptor is currently open.
*/
public boolean isOpen() {
return refCount.isOpen();
}
/**
* @return The socket path.
@ -381,20 +297,20 @@ private static native void setAttribute0(int fd, int type, int val)
throws IOException;
public void setAttribute(int type, int size) throws IOException {
status.reference();
refCount.reference();
boolean exc = true;
try {
setAttribute0(fd, type, size);
exc = false;
} finally {
status.unreference(exc);
unreference(exc);
}
}
private native int getAttribute0(int fd, int type) throws IOException;
public int getAttribute(int type) throws IOException {
status.reference();
refCount.reference();
int attribute;
boolean exc = true;
try {
@ -402,7 +318,7 @@ public int getAttribute(int type) throws IOException {
exc = false;
return attribute;
} finally {
status.unreference(exc);
unreference(exc);
}
}
@ -419,9 +335,9 @@ private static native void closeFileDescriptor0(FileDescriptor fd)
@Override
public void close() throws IOException {
// Set the closed bit on this DomainSocket
int refCount;
int count;
try {
refCount = status.setClosed();
count = refCount.setClosed();
} catch (ClosedChannelException e) {
// Someone else already closed the DomainSocket.
return;
@ -429,7 +345,7 @@ public void close() throws IOException {
// Wait for all references to go away
boolean didShutdown = false;
boolean interrupted = false;
while (refCount > 0) {
while (count > 0) {
if (!didShutdown) {
try {
// Calling shutdown on the socket will interrupt blocking system
@ -446,7 +362,7 @@ public void close() throws IOException {
} catch (InterruptedException e) {
interrupted = true;
}
refCount = status.getReferenceCount();
count = refCount.getReferenceCount();
}
// At this point, nobody has a reference to the file descriptor,
@ -478,13 +394,13 @@ private native static void sendFileDescriptors0(int fd,
*/
public void sendFileDescriptors(FileDescriptor descriptors[],
byte jbuf[], int offset, int length) throws IOException {
status.reference();
refCount.reference();
boolean exc = true;
try {
sendFileDescriptors0(fd, descriptors, jbuf, offset, length);
exc = false;
} finally {
status.unreference(exc);
unreference(exc);
}
}
@ -515,14 +431,14 @@ private static native int receiveFileDescriptors0(int fd,
*/
public int receiveFileDescriptors(FileDescriptor[] descriptors,
byte jbuf[], int offset, int length) throws IOException {
status.reference();
refCount.reference();
boolean exc = true;
try {
int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length);
exc = false;
return nBytes;
} finally {
status.unreference(exc);
unreference(exc);
}
}
@ -539,7 +455,7 @@ public int recvFileInputStreams(FileInputStream[] streams, byte buf[],
for (int i = 0; i < streams.length; i++) {
streams[i] = null;
}
status.reference();
refCount.reference();
try {
int ret = receiveFileDescriptors0(fd, descriptors, buf, offset, length);
for (int i = 0, j = 0; i < descriptors.length; i++) {
@ -569,7 +485,7 @@ public int recvFileInputStreams(FileInputStream[] streams, byte buf[],
}
}
}
status.unreference(!success);
unreference(!success);
}
}
@ -593,7 +509,7 @@ private native static int readByteBufferDirect0(int fd, ByteBuffer dst,
public class DomainInputStream extends InputStream {
@Override
public int read() throws IOException {
status.reference();
refCount.reference();
boolean exc = true;
try {
byte b[] = new byte[1];
@ -601,33 +517,33 @@ public int read() throws IOException {
exc = false;
return (ret >= 0) ? b[0] : -1;
} finally {
status.unreference(exc);
unreference(exc);
}
}
@Override
public int read(byte b[], int off, int len) throws IOException {
status.reference();
refCount.reference();
boolean exc = true;
try {
int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
exc = false;
return nRead;
} finally {
status.unreference(exc);
unreference(exc);
}
}
@Override
public int available() throws IOException {
status.reference();
refCount.reference();
boolean exc = true;
try {
int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
exc = false;
return nAvailable;
} finally {
status.unreference(exc);
unreference(exc);
}
}
@ -649,7 +565,7 @@ public void close() throws IOException {
@Override
public void write(int val) throws IOException {
status.reference();
refCount.reference();
boolean exc = true;
try {
byte b[] = new byte[1];
@ -657,19 +573,19 @@ public void write(int val) throws IOException {
DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
exc = false;
} finally {
status.unreference(exc);
unreference(exc);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
status.reference();
boolean exc = true;
refCount.reference();
boolean exc = true;
try {
DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
exc = false;
} finally {
status.unreference(exc);
unreference(exc);
}
}
}
@ -688,7 +604,7 @@ public void close() throws IOException {
@Override
public int read(ByteBuffer dst) throws IOException {
status.reference();
refCount.reference();
boolean exc = true;
try {
int nread = 0;
@ -710,7 +626,7 @@ public int read(ByteBuffer dst) throws IOException {
exc = false;
return nread;
} finally {
status.unreference(exc);
unreference(exc);
}
}
}

View File

@ -0,0 +1,478 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.net.unix;
import java.io.Closeable;
import java.io.EOFException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.TreeMap;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.NativeCodeLoader;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
/**
* The DomainSocketWatcher watches a set of domain sockets to see when they
* become readable, or closed. When one of those events happens, it makes a
* callback.
*
* See {@link DomainSocket} for more information about UNIX domain sockets.
*/
@InterfaceAudience.LimitedPrivate("HDFS")
public final class DomainSocketWatcher extends Thread implements Closeable {
static {
if (SystemUtils.IS_OS_WINDOWS) {
loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
} else if (!NativeCodeLoader.isNativeCodeLoaded()) {
loadingFailureReason = "libhadoop cannot be loaded.";
} else {
String problem;
try {
anchorNative();
problem = null;
} catch (Throwable t) {
problem = "DomainSocketWatcher#anchorNative got error: " +
t.getMessage();
}
loadingFailureReason = problem;
}
}
static Log LOG = LogFactory.getLog(DomainSocketWatcher.class);
/**
* The reason why DomainSocketWatcher is not available, or null if it is
* available.
*/
private final static String loadingFailureReason;
/**
* Initializes the native library code.
*/
private static native void anchorNative();
interface Handler {
/**
* Handles an event on a socket. An event may be the socket becoming
* readable, or the remote end being closed.
*
* @param sock The socket that the event occurred on.
* @return Whether we should close the socket.
*/
boolean handle(DomainSocket sock);
}
/**
* Handler for {DomainSocketWatcher#notificationSockets[1]}
*/
private class NotificationHandler implements Handler {
public boolean handle(DomainSocket sock) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": NotificationHandler: doing a read on " +
sock.fd);
}
if (sock.getInputStream().read() == -1) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": NotificationHandler: got EOF on " + sock.fd);
}
throw new EOFException();
}
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": NotificationHandler: read succeeded on " +
sock.fd);
}
return false;
} catch (IOException e) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": NotificationHandler: setting closed to " +
"true for " + sock.fd);
}
closed = true;
return true;
}
}
}
private static class Entry {
final DomainSocket socket;
final Handler handler;
Entry(DomainSocket socket, Handler handler) {
this.socket = socket;
this.handler = handler;
}
DomainSocket getDomainSocket() {
return socket;
}
Handler getHandler() {
return handler;
}
}
/**
* The FdSet is a set of file descriptors that gets passed to poll(2).
* It contains a native memory segment, so that we don't have to copy
* in the poll0 function.
*/
private static class FdSet {
private long data;
private native static long alloc0();
FdSet() {
data = alloc0();
}
/**
* Add a file descriptor to the set.
*
* @param fd The file descriptor to add.
*/
native void add(int fd);
/**
* Remove a file descriptor from the set.
*
* @param fd The file descriptor to remove.
*/
native void remove(int fd);
/**
* Get an array containing all the FDs marked as readable.
* Also clear the state of all FDs.
*
* @return An array containing all of the currently readable file
* descriptors.
*/
native int[] getAndClearReadableFds();
/**
* Close the object and de-allocate the memory used.
*/
native void close();
}
/**
* Lock which protects toAdd, toRemove, and closed.
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* Condition variable which indicates that toAdd and toRemove have been
* processed.
*/
private final Condition processedCond = lock.newCondition();
/**
* Entries to add.
*/
private final LinkedList<Entry> toAdd =
new LinkedList<Entry>();
/**
* Entries to remove.
*/
private final TreeMap<Integer, DomainSocket> toRemove =
new TreeMap<Integer, DomainSocket>();
/**
* Maximum length of time to go between checking whether the interrupted
* bit has been set for this thread.
*/
private final int interruptCheckPeriodMs;
/**
* A pair of sockets used to wake up the thread after it has called poll(2).
*/
private final DomainSocket notificationSockets[];
/**
* Whether or not this DomainSocketWatcher is closed.
*/
private boolean closed = false;
public DomainSocketWatcher(int interruptCheckPeriodMs) throws IOException {
if (loadingFailureReason != null) {
throw new UnsupportedOperationException(loadingFailureReason);
}
notificationSockets = DomainSocket.socketpair();
this.interruptCheckPeriodMs = interruptCheckPeriodMs;
Preconditions.checkArgument(interruptCheckPeriodMs > 0);
watcherThread.start();
}
/**
* Close the DomainSocketWatcher and wait for its thread to terminate.
*
* If there is more than one close, all but the first will be ignored.
*/
@Override
public void close() throws IOException {
try {
lock.lock();
if (closed) return;
LOG.info(this + ": closing");
closed = true;
} finally {
lock.unlock();
}
// Close notificationSockets[0], so that notificationSockets[1] gets an EOF
// event. This will wake up the thread immediately if it is blocked inside
// the select() system call.
notificationSockets[0].close();
// Wait for the select thread to terminate.
Uninterruptibles.joinUninterruptibly(watcherThread);
}
/**
* Add a socket.
*
* @param sock The socket to add. It is an error to re-add a socket that
* we are already watching.
* @param handler The handler to associate with this socket. This may be
* called any time after this function is called.
*/
public void add(DomainSocket sock, Handler handler) {
try {
lock.lock();
checkNotClosed();
Entry entry = new Entry(sock, handler);
try {
sock.refCount.reference();
} catch (ClosedChannelException e) {
Preconditions.checkArgument(false,
"tried to add a closed DomainSocket to " + this);
}
toAdd.add(entry);
kick();
while (true) {
try {
processedCond.await();
} catch (InterruptedException e) {
this.interrupt();
}
if (!toAdd.contains(entry)) {
break;
}
checkNotClosed();
}
} finally {
lock.unlock();
}
}
/**
* Remove a socket. Its handler will be called.
*
* @param sock The socket to remove.
*/
public void remove(DomainSocket sock) {
try {
lock.lock();
checkNotClosed();
toRemove.put(sock.fd, sock);
kick();
while (true) {
try {
processedCond.await();
} catch (InterruptedException e) {
this.interrupt();
}
if (!toRemove.containsKey(sock.fd)) {
break;
}
checkNotClosed();
}
} finally {
lock.unlock();
}
}
/**
* Wake up the DomainSocketWatcher thread.
*/
private void kick() {
try {
notificationSockets[0].getOutputStream().write(0);
} catch (IOException e) {
LOG.error(this + ": error writing to notificationSockets[0]", e);
}
}
/**
* Check that the DomainSocketWatcher is not closed.
* Must be called while holding the lock.
*/
private void checkNotClosed() {
Preconditions.checkState(lock.isHeldByCurrentThread());
if (closed) {
throw new RuntimeException("DomainSocketWatcher is closed.");
}
}
private void sendCallback(String caller, TreeMap<Integer, Entry> entries,
FdSet fdSet, int fd) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": " + caller + " starting sendCallback for fd " + fd);
}
Entry entry = entries.get(fd);
Preconditions.checkNotNull(entry,
this + ": fdSet contained " + fd + ", which we were " +
"not tracking.");
DomainSocket sock = entry.getDomainSocket();
if (entry.getHandler().handle(sock)) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": " + caller + ": closing fd " + fd +
" at the request of the handler.");
}
if (toRemove.remove(fd) != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": " + caller + " : sendCallback processed fd " +
fd + " in toRemove.");
}
}
try {
sock.refCount.unreferenceCheckClosed();
} catch (IOException e) {
Preconditions.checkArgument(false,
this + ": file descriptor " + sock.fd + " was closed while " +
"still in the poll(2) loop.");
}
IOUtils.cleanup(LOG, sock);
entries.remove(fd);
fdSet.remove(fd);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": " + caller + ": sendCallback not " +
"closing fd " + fd);
}
}
}
private final Thread watcherThread = new Thread(new Runnable() {
@Override
public void run() {
LOG.info(this + ": starting with interruptCheckPeriodMs = " +
interruptCheckPeriodMs);
final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>();
FdSet fdSet = new FdSet();
addNotificationSocket(entries, fdSet);
try {
while (true) {
lock.lock();
try {
for (int fd : fdSet.getAndClearReadableFds()) {
sendCallback("getAndClearReadableFds", entries, fdSet, fd);
}
if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
// Handle pending additions (before pending removes).
for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
Entry entry = iter.next();
DomainSocket sock = entry.getDomainSocket();
Entry prevEntry = entries.put(sock.fd, entry);
Preconditions.checkState(prevEntry == null,
this + ": tried to watch a file descriptor that we " +
"were already watching: " + sock);
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": adding fd " + sock.fd);
}
fdSet.add(sock.fd);
iter.remove();
}
// Handle pending removals
while (true) {
Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
if (entry == null) break;
sendCallback("handlePendingRemovals",
entries, fdSet, entry.getValue().fd);
}
processedCond.signalAll();
}
// Check if the thread should terminate. Doing this check now is
// easier than at the beginning of the loop, since we know toAdd and
// toRemove are now empty and processedCond has been notified if it
// needed to be.
if (closed) {
LOG.info(toString() + " thread terminating.");
return;
}
// Check if someone sent our thread an InterruptedException while we
// were waiting in poll().
if (Thread.interrupted()) {
throw new InterruptedException();
}
} finally {
lock.unlock();
}
doPoll0(interruptCheckPeriodMs, fdSet);
}
} catch (InterruptedException e) {
LOG.info(toString() + " terminating on InterruptedException");
} catch (IOException e) {
LOG.error(toString() + " terminating on IOException", e);
} finally {
for (Entry entry : entries.values()) {
sendCallback("close", entries, fdSet, entry.getDomainSocket().fd);
}
entries.clear();
fdSet.close();
}
}
});
private void addNotificationSocket(final TreeMap<Integer, Entry> entries,
FdSet fdSet) {
entries.put(notificationSockets[1].fd,
new Entry(notificationSockets[1], new NotificationHandler()));
try {
notificationSockets[1].refCount.reference();
} catch (IOException e) {
throw new RuntimeException(e);
}
fdSet.add(notificationSockets[1].fd);
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": adding notificationSocket " +
notificationSockets[1].fd + ", connected to " +
notificationSockets[0].fd);
}
}
public String toString() {
return "DomainSocketWatcher(" + System.identityHashCode(this) + ")";
}
private static native int doPoll0(int maxWaitMs, FdSet readFds)
throws IOException;
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
/**
* A closeable object that maintains a reference count.
*
* Once the object is closed, attempting to take a new reference will throw
* ClosedChannelException.
*/
public class CloseableReferenceCount {
/**
* Bit mask representing a closed domain socket.
*/
private static final int STATUS_CLOSED_MASK = 1 << 30;
/**
* The status bits.
*
* Bit 30: 0 = open, 1 = closed.
* Bits 29 to 0: the reference count.
*/
private final AtomicInteger status = new AtomicInteger(0);
public CloseableReferenceCount() { }
/**
* Increment the reference count.
*
* @throws ClosedChannelException If the status is closed.
*/
public void reference() throws ClosedChannelException {
int curBits = status.incrementAndGet();
if ((curBits & STATUS_CLOSED_MASK) != 0) {
status.decrementAndGet();
throw new ClosedChannelException();
}
}
/**
* Decrement the reference count.
*
* @return True if the object is closed and has no outstanding
* references.
*/
public boolean unreference() {
int newVal = status.decrementAndGet();
Preconditions.checkState(newVal != 0xffffffff,
"called unreference when the reference count was already at 0.");
return newVal == STATUS_CLOSED_MASK;
}
/**
* Decrement the reference count, checking to make sure that the
* CloseableReferenceCount is not closed.
*
* @throws AsynchronousCloseException If the status is closed.
*/
public void unreferenceCheckClosed() throws ClosedChannelException {
int newVal = status.decrementAndGet();
if ((newVal & STATUS_CLOSED_MASK) != 0) {
throw new AsynchronousCloseException();
}
}
/**
* Return true if the status is currently open.
*
* @return True if the status is currently open.
*/
public boolean isOpen() {
return ((status.get() & STATUS_CLOSED_MASK) == 0);
}
/**
* Mark the status as closed.
*
* Once the status is closed, it cannot be reopened.
*
* @return The current reference count.
* @throws ClosedChannelException If someone else closes the object
* before we do.
*/
public int setClosed() throws ClosedChannelException {
while (true) {
int curBits = status.get();
if ((curBits & STATUS_CLOSED_MASK) != 0) {
throw new ClosedChannelException();
}
if (status.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
return curBits & (~STATUS_CLOSED_MASK);
}
}
}
/**
* Get the current reference count.
*
* @return The current reference count.
*/
public int getReferenceCount() {
return status.get() & (~STATUS_CLOSED_MASK);
}
}

View File

@ -18,6 +18,7 @@
#include "org_apache_hadoop.h"
#include "org_apache_hadoop_io_nativeio_NativeIO.h"
#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
#ifdef UNIX
#include <assert.h>
@ -49,6 +50,10 @@
#include "file_descriptor.h"
#include "errno_enum.h"
#define MMAP_PROT_READ org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_READ
#define MMAP_PROT_WRITE org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_WRITE
#define MMAP_PROT_EXEC org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_EXEC
// the NativeIO$POSIX$Stat inner class and its constructor
static jclass stat_clazz;
static jmethodID stat_ctor;
@ -661,6 +666,39 @@ cleanup:
#endif
}
JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mmap(
JNIEnv *env, jclass clazz, jobject jfd, jint jprot,
jboolean jshared, jlong length)
{
void *addr = 0;
int prot, flags, fd;
prot = ((jprot & MMAP_PROT_READ) ? PROT_READ : 0) |
((jprot & MMAP_PROT_WRITE) ? PROT_WRITE : 0) |
((jprot & MMAP_PROT_EXEC) ? PROT_EXEC : 0);
flags = (jshared == JNI_TRUE) ? MAP_SHARED : MAP_PRIVATE;
fd = fd_get(env, jfd);
addr = mmap(NULL, length, prot, flags, fd, 0);
if (addr == MAP_FAILED) {
throw_ioe(env, errno);
}
return (jlong)(intptr_t)addr;
}
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munmap(
JNIEnv *env, jclass clazz, jlong jaddr, jlong length)
{
void *addr;
addr = (void*)(intptr_t)jaddr;
if (munmap(addr, length) < 0) {
throw_ioe(env, errno);
}
}
/*
* static native String getGroupName(int gid);
*

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "org_apache_hadoop.h"
#ifdef UNIX
#include "exception.h"
#include "file_descriptor.h"
#include "org_apache_hadoop.h"
#include "org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory.h"
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
static pthread_mutex_t g_rand_lock = PTHREAD_MUTEX_INITIALIZER;
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_deleteStaleTemporaryFiles0(
JNIEnv *env, jclass clazz, jstring jprefix, jstring jpath)
{
const char *prefix = NULL, *path = NULL;
char target[PATH_MAX];
jthrowable jthr;
DIR *dp = NULL;
struct dirent *de;
prefix = (*env)->GetStringUTFChars(env, jprefix, NULL);
if (!prefix) goto done; // exception raised
path = (*env)->GetStringUTFChars(env, jpath, NULL);
if (!path) goto done; // exception raised
dp = opendir(path);
if (!dp) {
int ret = errno;
jthr = newIOException(env, "opendir(%s) error %d: %s",
path, ret, terror(ret));
(*env)->Throw(env, jthr);
goto done;
}
while ((de = readdir(dp))) {
if (strncmp(prefix, de->d_name, strlen(prefix)) == 0) {
int ret = snprintf(target, PATH_MAX, "%s/%s", path, de->d_name);
if ((0 < ret) && (ret < PATH_MAX)) {
unlink(target);
}
}
}
done:
if (dp) {
closedir(dp);
}
if (prefix) {
(*env)->ReleaseStringUTFChars(env, jprefix, prefix);
}
if (path) {
(*env)->ReleaseStringUTFChars(env, jpath, path);
}
}
JNIEXPORT jobject JNICALL
Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_createDescriptor0(
JNIEnv *env, jclass clazz, jstring jprefix, jstring jpath, jint length)
{
const char *prefix = NULL, *path = NULL;
char target[PATH_MAX];
int ret, fd = -1, rnd;
jthrowable jthr;
jobject jret = NULL;
prefix = (*env)->GetStringUTFChars(env, jprefix, NULL);
if (!prefix) goto done; // exception raised
path = (*env)->GetStringUTFChars(env, jpath, NULL);
if (!path) goto done; // exception raised
pthread_mutex_lock(&g_rand_lock);
rnd = rand();
pthread_mutex_unlock(&g_rand_lock);
while (1) {
ret = snprintf(target, PATH_MAX, "%s/%s_%d",
path, prefix, rnd);
if (ret < 0) {
jthr = newIOException(env, "snprintf error");
(*env)->Throw(env, jthr);
goto done;
} else if (ret >= PATH_MAX) {
jthr = newIOException(env, "computed path was too long.");
(*env)->Throw(env, jthr);
goto done;
}
fd = open(target, O_CREAT | O_EXCL | O_RDWR, 0700);
if (fd >= 0) break; // success
ret = errno;
if (ret == EEXIST) {
// Bad luck -- we got a very rare collision here between us and
// another DataNode (or process). Try again.
continue;
} else if (ret == EINTR) {
// Most of the time, this error is only possible when opening FIFOs.
// But let's be thorough.
continue;
}
jthr = newIOException(env, "open(%s, O_CREAT | O_EXCL | O_RDWR) "
"failed: error %d (%s)", target, ret, terror(ret));
(*env)->Throw(env, jthr);
goto done;
}
if (unlink(target) < 0) {
jthr = newIOException(env, "unlink(%s) failed: error %d (%s)",
path, ret, terror(ret));
(*env)->Throw(env, jthr);
goto done;
}
if (ftruncate(fd, length) < 0) {
jthr = newIOException(env, "ftruncate(%s, %d) failed: error %d (%s)",
path, length, ret, terror(ret));
(*env)->Throw(env, jthr);
goto done;
}
jret = fd_create(env, fd); // throws exception on error.
done:
if (prefix) {
(*env)->ReleaseStringUTFChars(env, jprefix, prefix);
}
if (path) {
(*env)->ReleaseStringUTFChars(env, jpath, path);
}
if (!jret) {
if (fd >= 0) {
close(fd);
}
}
return jret;
}
#endif

View File

@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include "exception.h"
#include "org_apache_hadoop.h"
#include "org_apache_hadoop_net_unix_DomainSocketWatcher.h"
#include <errno.h>
#include <fcntl.h>
#include <jni.h>
#include <poll.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
static jfieldID fd_set_data_fid;
#define FD_SET_DATA_MIN_SIZE 2
struct fd_set_data {
/**
* Number of fds we have allocated space for.
*/
int alloc_size;
/**
* Number of fds actually in use.
*/
int used_size;
/**
* Beginning of pollfd data.
*/
struct pollfd pollfd[0];
};
JNIEXPORT void JNICALL
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_anchorNative(
JNIEnv *env, jclass clazz)
{
jclass fd_set_class;
fd_set_class = (*env)->FindClass(env,
"org/apache/hadoop/net/unix/DomainSocketWatcher$FdSet");
if (!fd_set_class) return; // exception raised
fd_set_data_fid = (*env)->GetFieldID(env, fd_set_class, "data", "J");
if (!fd_set_data_fid) return; // exception raised
}
JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_alloc0(
JNIEnv *env, jclass clazz)
{
struct fd_set_data *sd;
sd = calloc(1, sizeof(struct fd_set_data) +
(sizeof(struct pollfd) * FD_SET_DATA_MIN_SIZE));
if (!sd) {
(*env)->Throw(env, newRuntimeException(env, "out of memory allocating "
"DomainSocketWatcher#FdSet"));
return 0L;
}
sd->alloc_size = FD_SET_DATA_MIN_SIZE;
sd->used_size = 0;
return (jlong)(intptr_t)sd;
}
JNIEXPORT void JNICALL
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_add(
JNIEnv *env, jobject obj, jint fd)
{
struct fd_set_data *sd, *nd;
struct pollfd *pollfd;
sd = (struct fd_set_data*)(intptr_t)(*env)->
GetLongField(env, obj, fd_set_data_fid);
if (sd->used_size + 1 > sd->alloc_size) {
nd = realloc(sd, sizeof(struct fd_set_data) +
(sizeof(struct pollfd) * sd->alloc_size * 2));
if (!nd) {
(*env)->Throw(env, newRuntimeException(env, "out of memory adding "
"another fd to DomainSocketWatcher#FdSet. we have %d already",
sd->alloc_size));
return;
}
nd->alloc_size = nd->alloc_size * 2;
(*env)->SetLongField(env, obj, fd_set_data_fid, (jlong)(intptr_t)nd);
sd = nd;
}
pollfd = &sd->pollfd[sd->used_size];
sd->used_size++;
pollfd->fd = fd;
pollfd->events = POLLIN;
pollfd->revents = 0;
}
JNIEXPORT void JNICALL
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_remove(
JNIEnv *env, jobject obj, jint fd)
{
struct fd_set_data *sd;
struct pollfd *pollfd, *last_pollfd;
int used_size, i;
sd = (struct fd_set_data*)(intptr_t)(*env)->
GetLongField(env, obj, fd_set_data_fid);
used_size = sd->used_size;
for (i = 0; i < used_size; i++) {
pollfd = sd->pollfd + i;
if (pollfd->fd == fd) break;
}
if (i == used_size) {
(*env)->Throw(env, newRuntimeException(env, "failed to remove fd %d "
"from the FdSet because it was never present.", fd));
return;
}
last_pollfd = sd->pollfd + (used_size - 1);
if (used_size > 1) {
// Move last pollfd to the new empty slot if needed
pollfd->fd = last_pollfd->fd;
pollfd->events = last_pollfd->events;
pollfd->revents = last_pollfd->revents;
}
memset(last_pollfd, 0, sizeof(struct pollfd));
sd->used_size--;
}
JNIEXPORT jobject JNICALL
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_getAndClearReadableFds(
JNIEnv *env, jobject obj)
{
int *carr = NULL;
jobject jarr = NULL;
struct fd_set_data *sd;
int used_size, num_readable = 0, i, j;
jthrowable jthr = NULL;
sd = (struct fd_set_data*)(intptr_t)(*env)->
GetLongField(env, obj, fd_set_data_fid);
used_size = sd->used_size;
for (i = 0; i < used_size; i++) {
if (sd->pollfd[i].revents & POLLIN) {
num_readable++;
} else {
sd->pollfd[i].revents = 0;
}
}
if (num_readable > 0) {
carr = malloc(sizeof(int) * num_readable);
if (!carr) {
jthr = newRuntimeException(env, "failed to allocate a temporary array "
"of %d ints", num_readable);
goto done;
}
j = 0;
for (i = 0; ((i < used_size) && (j < num_readable)); i++) {
if (sd->pollfd[i].revents & POLLIN) {
carr[j] = sd->pollfd[i].fd;
j++;
sd->pollfd[i].revents = 0;
}
}
if (j != num_readable) {
jthr = newRuntimeException(env, "failed to fill entire carr "
"array of size %d: only filled %d elements", num_readable, j);
goto done;
}
}
jarr = (*env)->NewIntArray(env, num_readable);
if (!jarr) {
jthr = (*env)->ExceptionOccurred(env);
(*env)->ExceptionClear(env);
goto done;
}
if (num_readable > 0) {
(*env)->SetIntArrayRegion(env, jarr, 0, num_readable, carr);
jthr = (*env)->ExceptionOccurred(env);
if (jthr) {
(*env)->ExceptionClear(env);
goto done;
}
}
done:
free(carr);
if (jthr) {
(*env)->DeleteLocalRef(env, jarr);
jarr = NULL;
}
return jarr;
}
JNIEXPORT void JNICALL
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_00024FdSet_close(
JNIEnv *env, jobject obj)
{
struct fd_set_data *sd;
sd = (struct fd_set_data*)(intptr_t)(*env)->
GetLongField(env, obj, fd_set_data_fid);
if (sd) {
free(sd);
(*env)->SetLongField(env, obj, fd_set_data_fid, 0L);
}
}
JNIEXPORT jint JNICALL
Java_org_apache_hadoop_net_unix_DomainSocketWatcher_doPoll0(
JNIEnv *env, jclass clazz, jint checkMs, jobject fdSet)
{
struct fd_set_data *sd;
int ret, err;
sd = (struct fd_set_data*)(intptr_t)(*env)->
GetLongField(env, fdSet, fd_set_data_fid);
ret = poll(sd->pollfd, sd->used_size, checkMs);
if (ret >= 0) {
return ret;
}
err = errno;
if (err != EINTR) { // treat EINTR as 0 fds ready
(*env)->Throw(env, newIOException(env,
"poll(2) failed with error code %d: %s", err, terror(err)));
}
return 0;
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.nativeio;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
public class TestSharedFileDescriptorFactory {
static final Log LOG = LogFactory.getLog(TestSharedFileDescriptorFactory.class);
private static final File TEST_BASE =
new File(System.getProperty("test.build.data", "/tmp"));
@Test(timeout=10000)
public void testReadAndWrite() throws Exception {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
File path = new File(TEST_BASE, "testReadAndWrite");
path.mkdirs();
SharedFileDescriptorFactory factory =
new SharedFileDescriptorFactory("woot_", path.getAbsolutePath());
FileInputStream inStream = factory.createDescriptor(4096);
FileOutputStream outStream = new FileOutputStream(inStream.getFD());
outStream.write(101);
inStream.getChannel().position(0);
Assert.assertEquals(101, inStream.read());
inStream.close();
outStream.close();
FileUtil.fullyDelete(path);
}
static private void createTempFile(String path) throws Exception {
FileOutputStream fos = new FileOutputStream(path);
fos.write(101);
fos.close();
}
@Test(timeout=10000)
public void testCleanupRemainders() throws Exception {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
File path = new File(TEST_BASE, "testCleanupRemainders");
path.mkdirs();
String remainder1 = path.getAbsolutePath() +
Path.SEPARATOR + "woot2_remainder1";
String remainder2 = path.getAbsolutePath() +
Path.SEPARATOR + "woot2_remainder2";
createTempFile(remainder1);
createTempFile(remainder2);
new SharedFileDescriptorFactory("woot2_", path.getAbsolutePath());
// creating the SharedFileDescriptorFactory should have removed
// the remainders
Assert.assertFalse(new File(remainder1).exists());
Assert.assertFalse(new File(remainder2).exists());
FileUtil.fullyDelete(path);
}
}

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.net.unix;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import com.google.common.util.concurrent.Uninterruptibles;
public class TestDomainSocketWatcher {
static final Log LOG = LogFactory.getLog(TestDomainSocketWatcher.class);
@Before
public void before() {
Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
}
/**
* Test that we can create a DomainSocketWatcher and then shut it down.
*/
@Test(timeout=60000)
public void testCreateShutdown() throws Exception {
DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
watcher.close();
}
/**
* Test that we can get notifications out a DomainSocketWatcher.
*/
@Test(timeout=180000)
public void testDeliverNotifications() throws Exception {
DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
DomainSocket pair[] = DomainSocket.socketpair();
final CountDownLatch latch = new CountDownLatch(1);
watcher.add(pair[1], new DomainSocketWatcher.Handler() {
@Override
public boolean handle(DomainSocket sock) {
latch.countDown();
return true;
}
});
pair[0].close();
latch.await();
watcher.close();
}
/**
* Test that a java interruption can stop the watcher thread
*/
@Test(timeout=60000)
public void testInterruption() throws Exception {
DomainSocketWatcher watcher = new DomainSocketWatcher(10);
watcher.interrupt();
Uninterruptibles.joinUninterruptibly(watcher);
}
@Test(timeout=300000)
public void testStress() throws Exception {
final int SOCKET_NUM = 250;
final ReentrantLock lock = new ReentrantLock();
final DomainSocketWatcher watcher = new DomainSocketWatcher(10000000);
final ArrayList<DomainSocket[]> pairs = new ArrayList<DomainSocket[]>();
final AtomicInteger handled = new AtomicInteger(0);
final Thread adderThread = new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < SOCKET_NUM; i++) {
DomainSocket pair[] = DomainSocket.socketpair();
watcher.add(pair[1], new DomainSocketWatcher.Handler() {
@Override
public boolean handle(DomainSocket sock) {
handled.incrementAndGet();
return true;
}
});
lock.lock();
try {
pairs.add(pair);
} finally {
lock.unlock();
}
}
} catch (Throwable e) {
LOG.error(e);
throw new RuntimeException(e);
}
}
});
final Thread removerThread = new Thread(new Runnable() {
@Override
public void run() {
final Random random = new Random();
try {
while (handled.get() != SOCKET_NUM) {
lock.lock();
try {
if (!pairs.isEmpty()) {
int idx = random.nextInt(pairs.size());
DomainSocket pair[] = pairs.remove(idx);
if (random.nextBoolean()) {
pair[0].close();
} else {
watcher.remove(pair[1]);
}
}
} finally {
lock.unlock();
}
}
} catch (Throwable e) {
LOG.error(e);
throw new RuntimeException(e);
}
}
});
adderThread.start();
removerThread.start();
Uninterruptibles.joinUninterruptibly(adderThread);
Uninterruptibles.joinUninterruptibly(removerThread);
watcher.close();
}
}

View File

@ -306,6 +306,8 @@ Release 2.4.0 - UNRELEASED
HDFS-5859. DataNode#checkBlockToken should check block tokens even if
security is not enabled. (cmccabe)
HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe)
OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -0,0 +1,302 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.Shell;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import sun.misc.Unsafe;
public class ShortCircuitSharedMemorySegment implements Closeable {
private static final Log LOG =
LogFactory.getLog(ShortCircuitSharedMemorySegment.class);
private static final int BYTES_PER_SLOT = 64;
private static final Unsafe unsafe;
static {
Unsafe theUnsafe = null;
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
theUnsafe = (Unsafe)f.get(null);
} catch (Throwable e) {
LOG.error("failed to load misc.Unsafe", e);
}
unsafe = theUnsafe;
}
/**
* A slot containing information about a replica.
*
* The format is:
* word 0
* bit 0:32 Slot flags (see below).
* bit 33:63 Anchor count.
* word 1:7
* Reserved for future use, such as statistics.
* Padding is also useful for avoiding false sharing.
*
* Little-endian versus big-endian is not relevant here since both the client
* and the server reside on the same computer and use the same orientation.
*/
public class Slot implements Closeable {
/**
* Flag indicating that the slot is in use.
*/
private static final long SLOT_IN_USE_FLAG = 1L<<63;
/**
* Flag indicating that the slot can be anchored.
*/
private static final long ANCHORABLE_FLAG = 1L<<62;
private long slotAddress;
Slot(long slotAddress) {
this.slotAddress = slotAddress;
}
/**
* Make a given slot anchorable.
*/
public void makeAnchorable() {
Preconditions.checkState(slotAddress != 0,
"Called makeAnchorable on a slot that was closed.");
long prev;
do {
prev = unsafe.getLongVolatile(null, this.slotAddress);
if ((prev & ANCHORABLE_FLAG) != 0) {
return;
}
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
prev, prev | ANCHORABLE_FLAG));
}
/**
* Make a given slot unanchorable.
*/
public void makeUnanchorable() {
Preconditions.checkState(slotAddress != 0,
"Called makeUnanchorable on a slot that was closed.");
long prev;
do {
prev = unsafe.getLongVolatile(null, this.slotAddress);
if ((prev & ANCHORABLE_FLAG) == 0) {
return;
}
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
prev, prev & (~ANCHORABLE_FLAG)));
}
/**
* Try to add an anchor for a given slot.
*
* When a slot is anchored, we know that the block it refers to is resident
* in memory.
*
* @return True if the slot is anchored.
*/
public boolean addAnchor() {
long prev;
do {
prev = unsafe.getLongVolatile(null, this.slotAddress);
if ((prev & 0x7fffffff) == 0x7fffffff) {
// Too many other threads have anchored the slot (2 billion?)
return false;
}
if ((prev & ANCHORABLE_FLAG) == 0) {
// Slot can't be anchored right now.
return false;
}
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
prev, prev + 1));
return true;
}
/**
* Remove an anchor for a given slot.
*/
public void removeAnchor() {
long prev;
do {
prev = unsafe.getLongVolatile(null, this.slotAddress);
Preconditions.checkState((prev & 0x7fffffff) != 0,
"Tried to remove anchor for slot " + slotAddress +", which was " +
"not anchored.");
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
prev, prev - 1));
}
/**
* @return The index of this slot.
*/
public int getIndex() {
Preconditions.checkState(slotAddress != 0);
return Ints.checkedCast(
(slotAddress - baseAddress) / BYTES_PER_SLOT);
}
@Override
public void close() throws IOException {
if (slotAddress == 0) return;
long prev;
do {
prev = unsafe.getLongVolatile(null, this.slotAddress);
Preconditions.checkState((prev & SLOT_IN_USE_FLAG) != 0,
"tried to close slot that wasn't open");
} while (!unsafe.compareAndSwapLong(null, this.slotAddress,
prev, 0));
slotAddress = 0;
if (ShortCircuitSharedMemorySegment.this.refCount.unreference()) {
ShortCircuitSharedMemorySegment.this.free();
}
}
}
/**
* The stream that we're going to use to create this shared memory segment.
*
* Although this is a FileInputStream, we are going to assume that the
* underlying file descriptor is writable as well as readable.
* It would be more appropriate to use a RandomAccessFile here, but that class
* does not have any public accessor which returns a FileDescriptor, unlike
* FileInputStream.
*/
private final FileInputStream stream;
/**
* Length of the shared memory segment.
*/
private final int length;
/**
* The base address of the memory-mapped file.
*/
private final long baseAddress;
/**
* Reference count and 'closed' status.
*/
private final CloseableReferenceCount refCount = new CloseableReferenceCount();
public ShortCircuitSharedMemorySegment(FileInputStream stream)
throws IOException {
if (!NativeIO.isAvailable()) {
throw new UnsupportedOperationException("NativeIO is not available.");
}
if (Shell.WINDOWS) {
throw new UnsupportedOperationException(
"ShortCircuitSharedMemorySegment is not yet implemented " +
"for Windows.");
}
if (unsafe == null) {
throw new UnsupportedOperationException(
"can't use ShortCircuitSharedMemorySegment because we failed to " +
"load misc.Unsafe.");
}
this.refCount.reference();
this.stream = stream;
this.length = getEffectiveLength(stream);
this.baseAddress = POSIX.mmap(this.stream.getFD(),
POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, this.length);
}
/**
* Calculate the effective usable size of the shared memory segment.
* We round down to a multiple of the slot size and do some validation.
*
* @param stream The stream we're using.
* @return The effective usable size of the shared memory segment.
*/
private static int getEffectiveLength(FileInputStream stream)
throws IOException {
int intSize = Ints.checkedCast(stream.getChannel().size());
int slots = intSize / BYTES_PER_SLOT;
Preconditions.checkState(slots > 0, "size of shared memory segment was " +
intSize + ", but that is not enough to hold even one slot.");
return slots * BYTES_PER_SLOT;
}
private boolean allocateSlot(long address) {
long prev;
do {
prev = unsafe.getLongVolatile(null, address);
if ((prev & Slot.SLOT_IN_USE_FLAG) != 0) {
return false;
}
} while (!unsafe.compareAndSwapLong(null, address,
prev, prev | Slot.SLOT_IN_USE_FLAG));
return true;
}
/**
* Allocate a new Slot in this shared memory segment.
*
* @return A newly allocated Slot, or null if there were no available
* slots.
*/
public Slot allocateNextSlot() throws IOException {
ShortCircuitSharedMemorySegment.this.refCount.reference();
Slot slot = null;
try {
final int numSlots = length / BYTES_PER_SLOT;
for (int i = 0; i < numSlots; i++) {
long address = this.baseAddress + (i * BYTES_PER_SLOT);
if (allocateSlot(address)) {
slot = new Slot(address);
break;
}
}
} finally {
if (slot == null) {
if (refCount.unreference()) {
free();
}
}
}
return slot;
}
@Override
public void close() throws IOException {
refCount.setClosed();
if (refCount.unreference()) {
free();
}
}
void free() throws IOException {
IOUtils.cleanup(LOG, stream);
POSIX.munmap(baseAddress, length);
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
import org.apache.hadoop.hdfs.client.ShortCircuitSharedMemorySegment.Slot;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.Assert;
public class TestShortCircuitSharedMemorySegment {
public static final Log LOG =
LogFactory.getLog(TestShortCircuitSharedMemorySegment.class);
private static final File TEST_BASE =
new File(System.getProperty("test.build.data", "/tmp"));
@Before
public void before() {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
}
@Test(timeout=60000)
public void testStartupShutdown() throws Exception {
File path = new File(TEST_BASE, "testStartupShutdown");
path.mkdirs();
SharedFileDescriptorFactory factory =
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
FileInputStream stream = factory.createDescriptor(4096);
ShortCircuitSharedMemorySegment shm =
new ShortCircuitSharedMemorySegment(stream);
shm.close();
stream.close();
FileUtil.fullyDelete(path);
}
@Test(timeout=60000)
public void testAllocateSlots() throws Exception {
File path = new File(TEST_BASE, "testAllocateSlots");
path.mkdirs();
SharedFileDescriptorFactory factory =
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
FileInputStream stream = factory.createDescriptor(4096);
ShortCircuitSharedMemorySegment shm =
new ShortCircuitSharedMemorySegment(stream);
int numSlots = 0;
ArrayList<Slot> slots = new ArrayList<Slot>();
while (true) {
Slot slot = shm.allocateNextSlot();
if (slot == null) {
LOG.info("allocated " + numSlots + " slots before running out.");
break;
}
slots.add(slot);
numSlots++;
}
int slotIdx = 0;
for (Slot slot : slots) {
Assert.assertFalse(slot.addAnchor());
Assert.assertEquals(slotIdx++, slot.getIndex());
}
for (Slot slot : slots) {
slot.makeAnchorable();
}
for (Slot slot : slots) {
Assert.assertTrue(slot.addAnchor());
}
for (Slot slot : slots) {
slot.removeAnchor();
}
shm.close();
for (Slot slot : slots) {
slot.close();
}
stream.close();
FileUtil.fullyDelete(path);
}
}