HDFS-347: style cleanups. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1446830 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-02-16 00:59:01 +00:00
parent 27e158362c
commit 3a417cbf1d
7 changed files with 195 additions and 141 deletions

View File

@ -51,7 +51,7 @@ public class DomainSocket implements Closeable {
} else if (!NativeCodeLoader.isNativeCodeLoaded()) {
loadingFailureReason = "libhadoop cannot be loaded.";
} else {
String problem = "DomainSocket#anchorNative got error: unknown";
String problem;
try {
anchorNative();
problem = null;
@ -132,17 +132,99 @@ public static String getEffectivePath(String path, int port) {
}
/**
* Status bits
*
* Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed
* Bits 29 to 0: the reference count.
* Tracks the reference count of the file descriptor, and also whether it is
* open or closed.
*/
private final AtomicInteger status;
private static class Status {
/**
* Bit mask representing a closed domain socket.
*/
private static final int STATUS_CLOSED_MASK = 1 << 30;
/**
* Status bits
*
* Bit 30: 0 = DomainSocket open, 1 = DomainSocket closed
* Bits 29 to 0: the reference count.
*/
private final AtomicInteger bits = new AtomicInteger(0);
Status() { }
/**
* Increment the reference count of the underlying file descriptor.
*
* @throws ClosedChannelException If the file descriptor is closed.
*/
void reference() throws ClosedChannelException {
int curBits = bits.incrementAndGet();
if ((curBits & STATUS_CLOSED_MASK) != 0) {
bits.decrementAndGet();
throw new ClosedChannelException();
}
}
/**
* Decrement the reference count of the underlying file descriptor.
*
* @param checkClosed Whether to throw an exception if the file
* descriptor is closed.
*
* @throws AsynchronousCloseException If the file descriptor is closed and
* checkClosed is set.
*/
void unreference(boolean checkClosed) throws AsynchronousCloseException {
int newCount = bits.decrementAndGet();
assert (newCount & ~STATUS_CLOSED_MASK) >= 0;
if (checkClosed && ((newCount & STATUS_CLOSED_MASK) != 0)) {
throw new AsynchronousCloseException();
}
}
/**
* Return true if the file descriptor is currently open.
*
* @return True if the file descriptor is currently open.
*/
boolean isOpen() {
return ((bits.get() & STATUS_CLOSED_MASK) == 0);
}
/**
* Mark the file descriptor as closed.
*
* Once the file descriptor is closed, it cannot be reopened.
*
* @return The current reference count.
* @throws ClosedChannelException If someone else closes the file
* descriptor before we do.
*/
int setClosed() throws ClosedChannelException {
while (true) {
int curBits = bits.get();
if ((curBits & STATUS_CLOSED_MASK) != 0) {
throw new ClosedChannelException();
}
if (bits.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) {
return curBits & (~STATUS_CLOSED_MASK);
}
}
}
/**
* Get the current reference count.
*
* @return The current reference count.
*/
int getReferenceCount() {
return bits.get() & (~STATUS_CLOSED_MASK);
}
}
/**
* Bit mask representing a closed domain socket.
* The socket status.
*/
private static final int STATUS_CLOSED_MASK = 1 << 30;
private final Status status;
/**
* The file descriptor associated with this UNIX domain socket.
@ -170,7 +252,7 @@ public static String getEffectivePath(String path, int port) {
private final DomainChannel channel = new DomainChannel();
private DomainSocket(String path, int fd) {
this.status = new AtomicInteger(0);
this.status = new Status();
this.fd = fd;
this.path = path;
}
@ -208,14 +290,14 @@ public static DomainSocket bindAndListen(String path) throws IOException {
* @throws SocketTimeoutException If the accept timed out.
*/
public DomainSocket accept() throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
DomainSocket ret = new DomainSocket(path, accept0(fd));
exc = false;
return ret;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
@ -235,38 +317,14 @@ public static DomainSocket connect(String path) throws IOException {
return new DomainSocket(path, fd);
}
/**
* Increment the reference count of the underlying file descriptor.
*
* @throws SocketException If the file descriptor is closed.
*/
private void fdRef() throws ClosedChannelException {
int bits = status.incrementAndGet();
if ((bits & STATUS_CLOSED_MASK) != 0) {
status.decrementAndGet();
throw new ClosedChannelException();
}
}
/**
* Decrement the reference count of the underlying file descriptor.
*/
private void fdUnref(boolean checkClosed) throws AsynchronousCloseException {
int newCount = status.decrementAndGet();
assert (newCount & ~STATUS_CLOSED_MASK) >= 0;
if (checkClosed && ((newCount & STATUS_CLOSED_MASK) != 0)) {
throw new AsynchronousCloseException();
}
}
/**
* Return true if the file descriptor is currently open.
*
* @return True if the file descriptor is currently open.
*/
public boolean isOpen() {
return ((status.get() & STATUS_CLOSED_MASK) == 0);
}
/**
* Return true if the file descriptor is currently open.
*
* @return True if the file descriptor is currently open.
*/
public boolean isOpen() {
return status.isOpen();
}
/**
* @return The socket path.
@ -296,29 +354,29 @@ public DomainChannel getChannel() {
return channel;
}
public static final int SND_BUF_SIZE = 1;
public static final int RCV_BUF_SIZE = 2;
public static final int SND_TIMEO = 3;
public static final int RCV_TIMEO = 4;
public static final int SEND_BUFFER_SIZE = 1;
public static final int RECEIVE_BUFFER_SIZE = 2;
public static final int SEND_TIMEOUT = 3;
public static final int RECEIVE_TIMEOUT = 4;
private static native void setAttribute0(int fd, int type, int val)
throws IOException;
public void setAttribute(int type, int size) throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
setAttribute0(fd, type, size);
exc = false;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
private native int getAttribute0(int fd, int type) throws IOException;
public int getAttribute(int type) throws IOException {
fdRef();
status.reference();
int attribute;
boolean exc = true;
try {
@ -326,7 +384,7 @@ public int getAttribute(int type) throws IOException {
exc = false;
return attribute;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
@ -343,20 +401,17 @@ private static native void closeFileDescriptor0(FileDescriptor fd)
@Override
public void close() throws IOException {
// Set the closed bit on this DomainSocket
int bits;
while (true) {
bits = status.get();
if ((bits & STATUS_CLOSED_MASK) != 0) {
return; // already closed
}
if (status.compareAndSet(bits, bits | STATUS_CLOSED_MASK)) {
break;
}
int refCount;
try {
refCount = status.setClosed();
} catch (ClosedChannelException e) {
// Someone else already closed the DomainSocket.
return;
}
// Wait for all references to go away
boolean didShutdown = false;
boolean interrupted = false;
while ((bits & (~STATUS_CLOSED_MASK)) > 0) {
while (refCount > 0) {
if (!didShutdown) {
try {
// Calling shutdown on the socket will interrupt blocking system
@ -373,60 +428,57 @@ public void close() throws IOException {
} catch (InterruptedException e) {
interrupted = true;
}
bits = status.get();
refCount = status.getReferenceCount();
}
// Close the file descriptor. After this point, the file descriptor
// number will be reused by something else. Although this DomainSocket
// object continues to hold the old file descriptor number (it's a final
// field), we never use it again because we look at the closed bit and
// realize that this DomainSocket is not usable.
// At this point, nobody has a reference to the file descriptor,
// and nobody will be able to get one in the future either.
// We now call close(2) on the file descriptor.
// After this point, the file descriptor number will be reused by
// something else. Although this DomainSocket object continues to hold
// the old file descriptor number (it's a final field), we never use it
// again because this DomainSocket is closed.
close0(fd);
if (interrupted) {
Thread.currentThread().interrupt();
}
}
/*
* Clean up if the user forgets to close the socket.
*/
protected void finalize() throws IOException {
close();
}
private native static void sendFileDescriptors0(int fd, FileDescriptor jfds[],
private native static void sendFileDescriptors0(int fd,
FileDescriptor descriptors[],
byte jbuf[], int offset, int length) throws IOException;
/**
* Send some FileDescriptor objects to the process on the other side of this
* socket.
*
* @param jfds The file descriptors to send.
* @param descriptors The file descriptors to send.
* @param jbuf Some bytes to send. You must send at least
* one byte.
* @param offset The offset in the jbuf array to start at.
* @param length Length of the jbuf array to use.
*/
public void sendFileDescriptors(FileDescriptor jfds[],
public void sendFileDescriptors(FileDescriptor descriptors[],
byte jbuf[], int offset, int length) throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
sendFileDescriptors0(fd, jfds, jbuf, offset, length);
sendFileDescriptors0(fd, descriptors, jbuf, offset, length);
exc = false;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
private static native int receiveFileDescriptors0(int fd, FileDescriptor[] jfds,
private static native int receiveFileDescriptors0(int fd,
FileDescriptor[] descriptors,
byte jbuf[], int offset, int length) throws IOException;
/**
* Receive some FileDescriptor objects from the process on the other side of
* this socket.
*
* @param jfds (output parameter) Array of FileDescriptors.
* @param descriptors (output parameter) Array of FileDescriptors.
* We will fill as many slots as possible with file
* descriptors passed from the remote process. The
* other slots will contain NULL.
@ -443,16 +495,16 @@ private static native int receiveFileDescriptors0(int fd, FileDescriptor[] jfds,
* otherwise, it will be positive.
* @throws IOException if there was an I/O error.
*/
public int receiveFileDescriptors(FileDescriptor[] jfds,
public int receiveFileDescriptors(FileDescriptor[] descriptors,
byte jbuf[], int offset, int length) throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
int nBytes = receiveFileDescriptors0(fd, jfds, jbuf, offset, length);
int nBytes = receiveFileDescriptors0(fd, descriptors, jbuf, offset, length);
exc = false;
return nBytes;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
@ -462,44 +514,44 @@ public int receiveFileDescriptors(FileDescriptor[] jfds,
*
* See {@link DomainSocket#recvFileInputStreams(ByteBuffer)}
*/
public int recvFileInputStreams(FileInputStream[] fis, byte buf[],
public int recvFileInputStreams(FileInputStream[] streams, byte buf[],
int offset, int length) throws IOException {
FileDescriptor fds[] = new FileDescriptor[fis.length];
FileDescriptor descriptors[] = new FileDescriptor[streams.length];
boolean success = false;
for (int i = 0; i < fis.length; i++) {
fis[i] = null;
for (int i = 0; i < streams.length; i++) {
streams[i] = null;
}
fdRef();
status.reference();
try {
int ret = receiveFileDescriptors0(fd, fds, buf, offset, length);
for (int i = 0, j = 0; i < fds.length; i++) {
if (fds[i] != null) {
fis[j++] = new FileInputStream(fds[i]);
fds[i] = null;
int ret = receiveFileDescriptors0(fd, descriptors, buf, offset, length);
for (int i = 0, j = 0; i < descriptors.length; i++) {
if (descriptors[i] != null) {
streams[j++] = new FileInputStream(descriptors[i]);
descriptors[i] = null;
}
}
success = true;
return ret;
} finally {
if (!success) {
for (int i = 0; i < fds.length; i++) {
if (fds[i] != null) {
for (int i = 0; i < descriptors.length; i++) {
if (descriptors[i] != null) {
try {
closeFileDescriptor0(fds[i]);
closeFileDescriptor0(descriptors[i]);
} catch (Throwable t) {
LOG.warn(t);
}
} else if (fis[i] != null) {
} else if (streams[i] != null) {
try {
fis[i].close();
streams[i].close();
} catch (Throwable t) {
LOG.warn(t);
} finally {
fis[i] = null; }
streams[i] = null; }
}
}
}
fdUnref(!success);
status.unreference(!success);
}
}
@ -523,7 +575,7 @@ private native static int readByteBufferDirect0(int fd, ByteBuffer dst,
public class DomainInputStream extends InputStream {
@Override
public int read() throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
byte b[] = new byte[1];
@ -531,33 +583,33 @@ public int read() throws IOException {
exc = false;
return (ret >= 0) ? b[0] : -1;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
@Override
public int read(byte b[], int off, int len) throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
exc = false;
return nRead;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
@Override
public int available() throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
exc = false;
return nAvailable;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
@ -579,7 +631,7 @@ public void close() throws IOException {
@Override
public void write(int val) throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
byte b[] = new byte[1];
@ -587,19 +639,19 @@ public void write(int val) throws IOException {
DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
exc = false;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
exc = false;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
}
@ -618,7 +670,7 @@ public void close() throws IOException {
@Override
public int read(ByteBuffer dst) throws IOException {
fdRef();
status.reference();
boolean exc = true;
try {
int nread = 0;
@ -640,7 +692,7 @@ public int read(ByteBuffer dst) throws IOException {
exc = false;
return nread;
} finally {
fdUnref(exc);
status.unreference(exc);
}
}
}

View File

@ -38,13 +38,13 @@
#include <sys/un.h>
#include <unistd.h>
#define SND_BUF_SIZE org_apache_hadoop_net_unix_DomainSocket_SND_BUF_SIZE
#define RCV_BUF_SIZE org_apache_hadoop_net_unix_DomainSocket_RCV_BUF_SIZE
#define SND_TIMEO org_apache_hadoop_net_unix_DomainSocket_SND_TIMEO
#define RCV_TIMEO org_apache_hadoop_net_unix_DomainSocket_RCV_TIMEO
#define SEND_BUFFER_SIZE org_apache_hadoop_net_unix_DomainSocket_SEND_BUFFER_SIZE
#define RECEIVE_BUFFER_SIZE org_apache_hadoop_net_unix_DomainSocket_RECEIVE_BUFFER_SIZE
#define SEND_TIMEOUT org_apache_hadoop_net_unix_DomainSocket_SEND_TIMEOUT
#define RECEIVE_TIMEOUT org_apache_hadoop_net_unix_DomainSocket_RECEIVE_TIMEOUT
#define DEFAULT_RCV_TIMEO 120000
#define DEFAULT_SND_TIMEO 120000
#define DEFAULT_RECEIVE_TIMEOUT 120000
#define DEFAULT_SEND_TIMEOUT 120000
#define LISTEN_BACKLOG 128
/**
@ -391,8 +391,8 @@ JNIEnv *env, jclass clazz, jstring path)
(*env)->Throw(env, jthr);
return -1;
}
if (((jthr = setAttribute0(env, fd, SND_TIMEO, DEFAULT_SND_TIMEO))) ||
((jthr = setAttribute0(env, fd, RCV_TIMEO, DEFAULT_RCV_TIMEO)))) {
if (((jthr = setAttribute0(env, fd, SEND_TIMEOUT, DEFAULT_SEND_TIMEOUT))) ||
((jthr = setAttribute0(env, fd, RECEIVE_TIMEOUT, DEFAULT_RECEIVE_TIMEOUT)))) {
RETRY_ON_EINTR(ret, close(fd));
(*env)->Throw(env, jthr);
return -1;
@ -412,7 +412,7 @@ static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val)
int ret, buf;
switch (type) {
case SND_BUF_SIZE:
case SEND_BUFFER_SIZE:
buf = val;
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf, sizeof(buf))) {
ret = errno;
@ -420,7 +420,7 @@ static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val)
"setsockopt(SO_SNDBUF) error: %s", terror(ret));
}
return NULL;
case RCV_BUF_SIZE:
case RECEIVE_BUFFER_SIZE:
buf = val;
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf, sizeof(buf))) {
ret = errno;
@ -428,7 +428,7 @@ static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val)
"setsockopt(SO_RCVBUF) error: %s", terror(ret));
}
return NULL;
case SND_TIMEO:
case SEND_TIMEOUT:
javaMillisToTimeVal(val, &tv);
if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (struct timeval *)&tv,
sizeof(tv))) {
@ -437,7 +437,7 @@ static jthrowable setAttribute0(JNIEnv *env, jint fd, jint type, jint val)
"setsockopt(SO_SNDTIMEO) error: %s", terror(ret));
}
return NULL;
case RCV_TIMEO:
case RECEIVE_TIMEOUT:
javaMillisToTimeVal(val, &tv);
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (struct timeval *)&tv,
sizeof(tv))) {
@ -487,7 +487,7 @@ JNIEnv *env, jclass clazz, jint fd, jint type)
int ret, rval = 0;
switch (type) {
case SND_BUF_SIZE:
case SEND_BUFFER_SIZE:
len = sizeof(rval);
if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &rval, &len)) {
ret = errno;
@ -496,7 +496,7 @@ JNIEnv *env, jclass clazz, jint fd, jint type)
return -1;
}
return getSockOptBufSizeToJavaBufSize(rval);
case RCV_BUF_SIZE:
case RECEIVE_BUFFER_SIZE:
len = sizeof(rval);
if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rval, &len)) {
ret = errno;
@ -505,7 +505,7 @@ JNIEnv *env, jclass clazz, jint fd, jint type)
return -1;
}
return getSockOptBufSizeToJavaBufSize(rval);
case SND_TIMEO:
case SEND_TIMEOUT:
memset(&tv, 0, sizeof(tv));
len = sizeof(struct timeval);
if (getsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, &len)) {
@ -515,7 +515,7 @@ JNIEnv *env, jclass clazz, jint fd, jint type)
return -1;
}
return timeValToJavaMillis(&tv);
case RCV_TIMEO:
case RECEIVE_TIMEOUT:
memset(&tv, 0, sizeof(tv));
len = sizeof(struct timeval);
if (getsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, &len)) {

View File

@ -282,15 +282,15 @@ public void testServerOptions() throws Exception {
DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
try {
// Let's set a new receive buffer size
int bufSize = serv.getAttribute(DomainSocket.RCV_BUF_SIZE);
int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
int newBufSize = bufSize / 2;
serv.setAttribute(DomainSocket.RCV_BUF_SIZE, newBufSize);
int nextBufSize = serv.getAttribute(DomainSocket.RCV_BUF_SIZE);
serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize);
int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
Assert.assertEquals(newBufSize, nextBufSize);
// Let's set a server timeout
int newTimeout = 1000;
serv.setAttribute(DomainSocket.RCV_TIMEO, newTimeout);
int nextTimeout = serv.getAttribute(DomainSocket.RCV_TIMEO);
serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout);
int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT);
Assert.assertEquals(newTimeout, nextTimeout);
try {
serv.accept();

View File

@ -47,3 +47,5 @@ HDFS-4485. DN should chmod socket path a+w. (Colin Patrick McCabe via atm)
HDFS-4453. Make a simple doc to describe the usage and design of the shortcircuit read feature. (Colin Patrick McCabe via atm)
HDFS-4496. DFSClient: don't create a domain socket unless we need it (Colin Patrick McCabe via todd)
HDFS-347: style cleanups (Colin Patrick McCabe via atm)

View File

@ -114,7 +114,7 @@ DomainSocket create(InetSocketAddress addr, DFSInputStream stream) {
DomainSocket sock = null;
try {
sock = DomainSocket.connect(escapedPath);
sock.setAttribute(DomainSocket.RCV_TIMEO, conf.socketTimeout);
sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, conf.socketTimeout);
success = true;
} catch (IOException e) {
LOG.warn("error creating DomainSocket", e);

View File

@ -50,12 +50,12 @@ public ReadableByteChannel getInputStreamChannel() {
@Override
public void setReadTimeout(int timeoutMs) throws IOException {
socket.setAttribute(DomainSocket.RCV_TIMEO, timeoutMs);
socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs);
}
@Override
public int getReceiveBufferSize() throws IOException {
return socket.getAttribute(DomainSocket.RCV_BUF_SIZE);
return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
}
@Override
@ -66,7 +66,7 @@ public boolean getTcpNoDelay() throws IOException {
@Override
public void setWriteTimeout(int timeoutMs) throws IOException {
socket.setAttribute(DomainSocket.SND_TIMEO, timeoutMs);
socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs);
}
@Override

View File

@ -48,7 +48,7 @@ public String getBindPath() {
@Override
public void setReceiveBufferSize(int size) throws IOException {
sock.setAttribute(DomainSocket.RCV_BUF_SIZE, size);
sock.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, size);
}
@Override