Merge r1406327 through r1406414 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1406415 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-11-07 01:03:16 +00:00
commit 7160c01f61
6 changed files with 67 additions and 15 deletions

View File

@ -368,6 +368,8 @@ Release 2.0.3-alpha - Unreleased
HDFS-4059. Add number of stale DataNodes to metrics. (Jing Zhao via suresh) HDFS-4059. Add number of stale DataNodes to metrics. (Jing Zhao via suresh)
HDFS-4155. libhdfs implementation of hsync API (Liang Xie via todd)
IMPROVEMENTS IMPROVEMENTS
HDFS-3925. Prettify PipelineAck#toString() for printing to a log HDFS-3925. Prettify PipelineAck#toString() for printing to a log
@ -556,6 +558,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-1331. dfs -test should work like /bin/test (Andy Isaacson via daryn) HDFS-1331. dfs -test should work like /bin/test (Andy Isaacson via daryn)
HDFS-3979. For hsync, datanode should wait for the local sync to complete
before sending ack. (Lars Hofhansl via szetszwo)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -319,9 +319,6 @@ public void close() throws IOException {
* @throws IOException * @throws IOException
*/ */
void flushOrSync(boolean isSync) throws IOException { void flushOrSync(boolean isSync) throws IOException {
if (isSync && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
long flushTotalNanos = 0; long flushTotalNanos = 0;
if (checksumOut != null) { if (checksumOut != null) {
long flushStartNanos = System.nanoTime(); long flushStartNanos = System.nanoTime();
@ -347,6 +344,9 @@ void flushOrSync(boolean isSync) throws IOException {
} }
if (checksumOut != null || out != null) { if (checksumOut != null || out != null) {
datanode.metrics.addFlushNanos(flushTotalNanos); datanode.metrics.addFlushNanos(flushTotalNanos);
if (isSync) {
datanode.metrics.incrFsyncCount();
}
} }
} }
@ -438,8 +438,10 @@ private int receivePacket() throws IOException {
int len = header.getDataLen(); int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock(); boolean syncBlock = header.getSyncBlock();
// make sure the block gets sync'ed upon close // avoid double sync'ing on close
this.syncOnClose |= syncBlock && lastPacketInBlock; if (syncBlock && lastPacketInBlock) {
this.syncOnClose = false;
}
// update received bytes // update received bytes
long firstByteInBlock = offsetInBlock; long firstByteInBlock = offsetInBlock;
@ -448,9 +450,9 @@ private int receivePacket() throws IOException {
replicaInfo.setNumBytes(offsetInBlock); replicaInfo.setNumBytes(offsetInBlock);
} }
// put in queue for pending acks // put in queue for pending acks, unless sync was requested
if (responder != null) { if (responder != null && !syncBlock) {
((PacketResponder)responder.getRunnable()).enqueue(seqno, ((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock); lastPacketInBlock, offsetInBlock);
} }
@ -471,8 +473,8 @@ private int receivePacket() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Receiving an empty packet or the end of the block " + block); LOG.debug("Receiving an empty packet or the end of the block " + block);
} }
// flush unless close() would flush anyway // sync block if requested
if (syncBlock && !lastPacketInBlock) { if (syncBlock) {
flushOrSync(true); flushOrSync(true);
} }
} else { } else {
@ -563,8 +565,8 @@ private int receivePacket() throws IOException {
checksumBuf.arrayOffset() + checksumBuf.position(), checksumBuf.arrayOffset() + checksumBuf.position(),
checksumLen); checksumLen);
} }
/// flush entire packet, sync unless close() will sync /// flush entire packet, sync if requested
flushOrSync(syncBlock && !lastPacketInBlock); flushOrSync(syncBlock);
replicaInfo.setLastChecksumAndDataLen( replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum offsetInBlock, lastChunkChecksum
@ -580,6 +582,13 @@ private int receivePacket() throws IOException {
} }
} }
// if sync was requested, put in queue for pending acks here
// (after the fsync finished)
if (responder != null && syncBlock) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock);
}
if (throttler != null) { // throttle I/O if (throttler != null) { // throttle I/O
throttler.throttle(len); throttler.throttle(len);
} }

View File

@ -1388,6 +1388,32 @@ int hdfsHFlush(hdfsFS fs, hdfsFile f)
return 0; return 0;
} }
int hdfsHSync(hdfsFS fs, hdfsFile f)
{
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Sanity check
if (!f || f->type != OUTPUT) {
errno = EBADF;
return -1;
}
jobject jOutputStream = f->file;
jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
HADOOP_OSTRM, "hsync", "()V");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsHSync: FSDataOutputStream#hsync");
return -1;
}
return 0;
}
int hdfsAvailable(hdfsFS fs, hdfsFile f) int hdfsAvailable(hdfsFS fs, hdfsFile f)
{ {
// JAVA EQUIVALENT // JAVA EQUIVALENT

View File

@ -393,6 +393,17 @@ extern "C" {
int hdfsHFlush(hdfsFS fs, hdfsFile file); int hdfsHFlush(hdfsFS fs, hdfsFile file);
/**
* hdfsHSync - Similar to posix fsync, Flush out the data in client's
* user buffer. all the way to the disk device (but the disk may have
* it in its cache).
* @param fs configured filesystem handle
* @param file file handle
* @return 0 on success, -1 on error and sets errno
*/
int hdfsHSync(hdfsFS fs, hdfsFile file);
/** /**
* hdfsAvailable - Number of bytes that can be read from this * hdfsAvailable - Number of bytes that can be read from this
* input stream without blocking. * input stream without blocking.

View File

@ -150,6 +150,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs)
return EIO; return EIO;
} }
EXPECT_ZERO(hdfsFlush(fs, file)); EXPECT_ZERO(hdfsFlush(fs, file));
EXPECT_ZERO(hdfsHSync(fs, file));
EXPECT_ZERO(hdfsCloseFile(fs, file)); EXPECT_ZERO(hdfsCloseFile(fs, file));
/* Let's re-open the file for reading */ /* Let's re-open the file for reading */

View File

@ -15,7 +15,7 @@
* the License. * the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;