From 1e7010cf38115604d6fa3aa5728362c86644e66a Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Tue, 6 Nov 2012 22:34:29 +0000 Subject: [PATCH] HDFS-3979. For hsync, datanode should wait for the local sync to complete before sending ack. Contributed by Lars Hofhansl git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1406382 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../hdfs/server/datanode/BlockReceiver.java | 37 ++++++++++++------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 948f7defe0..b2766bd3b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -558,6 +558,9 @@ Release 2.0.3-alpha - Unreleased HDFS-1331. dfs -test should work like /bin/test (Andy Isaacson via daryn) + HDFS-3979. For hsync, datanode should wait for the local sync to complete + before sending ack. (Lars Hofhansl via szetszwo) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 0f1ccb9435..6995fd2d4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -319,9 +319,6 @@ public void close() throws IOException { * @throws IOException */ void flushOrSync(boolean isSync) throws IOException { - if (isSync && (out != null || checksumOut != null)) { - datanode.metrics.incrFsyncCount(); - } long flushTotalNanos = 0; if (checksumOut != null) { long flushStartNanos = System.nanoTime(); @@ -347,6 +344,9 @@ void flushOrSync(boolean isSync) throws IOException { } if (checksumOut != null || out != null) { datanode.metrics.addFlushNanos(flushTotalNanos); + if (isSync) { + datanode.metrics.incrFsyncCount(); + } } } @@ -438,8 +438,10 @@ private int receivePacket() throws IOException { int len = header.getDataLen(); boolean syncBlock = header.getSyncBlock(); - // make sure the block gets sync'ed upon close - this.syncOnClose |= syncBlock && lastPacketInBlock; + // avoid double sync'ing on close + if (syncBlock && lastPacketInBlock) { + this.syncOnClose = false; + } // update received bytes long firstByteInBlock = offsetInBlock; @@ -448,11 +450,11 @@ private int receivePacket() throws IOException { replicaInfo.setNumBytes(offsetInBlock); } - // put in queue for pending acks - if (responder != null) { - ((PacketResponder)responder.getRunnable()).enqueue(seqno, - lastPacketInBlock, offsetInBlock); - } + // put in queue for pending acks, unless sync was requested + if (responder != null && !syncBlock) { + ((PacketResponder) responder.getRunnable()).enqueue(seqno, + lastPacketInBlock, offsetInBlock); + } //First write the packet to the mirror: if (mirrorOut != null && !mirrorError) { @@ -471,8 +473,8 @@ private int receivePacket() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Receiving an empty packet or the end of the block " + block); } - // flush unless close() would flush anyway - if (syncBlock && !lastPacketInBlock) { + // sync block if requested + if (syncBlock) { flushOrSync(true); } } else { @@ -563,8 +565,8 @@ private int receivePacket() throws IOException { checksumBuf.arrayOffset() + checksumBuf.position(), checksumLen); } - /// flush entire packet, sync unless close() will sync - flushOrSync(syncBlock && !lastPacketInBlock); + /// flush entire packet, sync if requested + flushOrSync(syncBlock); replicaInfo.setLastChecksumAndDataLen( offsetInBlock, lastChunkChecksum @@ -580,6 +582,13 @@ private int receivePacket() throws IOException { } } + // if sync was requested, put in queue for pending acks here + // (after the fsync finished) + if (responder != null && syncBlock) { + ((PacketResponder) responder.getRunnable()).enqueue(seqno, + lastPacketInBlock, offsetInBlock); + } + if (throttler != null) { // throttle I/O throttler.throttle(len); }