diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
index 4373124caa..252f37b3dc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
@@ -44,6 +44,9 @@
* else append to an existing file.
*
CREATE|OVERWRITE - to create a file if it does not exist,
* else overwrite an existing file.
+ * SYNC_BLOCK - to force closed blocks to the disk device.
+ * In addition {@link Syncable#hsync()} should be called after each write,
+ * if true synchronous behavior is required.
*
*
* Following combination is not valid and will result in
@@ -71,7 +74,12 @@ public enum CreateFlag {
/**
* Append to a file. See javadoc for more description.
*/
- APPEND((short) 0x04);
+ APPEND((short) 0x04),
+
+ /**
+ * Force closed blocks to disk. Similar to POSIX O_SYNC. See javadoc for description.
+ */
+ SYNC_BLOCK((short) 0x08);
private final short mode;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index fe908beddd..1411130533 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -830,6 +830,30 @@ public abstract FSDataOutputStream create(Path f,
long blockSize,
Progressable progress) throws IOException;
+ /**
+ * Create an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ * @param f the file name to open
+ * @param permission
+ * @param flags {@link CreateFlag}s to use for this stream.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize
+ * @param progress
+ * @throws IOException
+ * @see #setPermission(Path, FsPermission)
+ */
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ EnumSet flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ // only DFS support this
+ return create(f, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
+ }
+
/*.
* This create has been added to support the FileContext that processes
@@ -954,10 +978,35 @@ public FSDataOutputStream createNonRecursive(Path f,
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
- throw new IOException("createNonRecursive unsupported for this filesystem "
- + this.getClass());
+ return createNonRecursive(f, permission,
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), bufferSize,
+ replication, blockSize, progress);
}
+ /**
+ * Opens an FSDataOutputStream at the indicated Path with write-progress
+ * reporting. Same as create(), except fails if parent directory doesn't
+ * already exist.
+ * @param f the file name to open
+ * @param permission
+ * @param flags {@link CreateFlag}s to use for this stream.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ * @param blockSize
+ * @param progress
+ * @throws IOException
+ * @see #setPermission(Path, FsPermission)
+ * @deprecated API only for 0.20-append
+ */
+ @Deprecated
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ EnumSet flags, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ throw new IOException("createNonRecursive unsupported for this filesystem "
+ + this.getClass());
+ }
+
/**
* Creates the given Path as a brand-new zero-length file. If
* create fails, or if it already existed, return false.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index 0919563bd7..f32d399f79 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -807,7 +807,7 @@ public String toString() {
}
/** Write key/value pairs to a sequence-format file. */
- public static class Writer implements java.io.Closeable {
+ public static class Writer implements java.io.Closeable, Syncable {
private Configuration conf;
FSDataOutputStream out;
boolean ownOutputStream = true;
@@ -1193,13 +1193,31 @@ public void sync() throws IOException {
}
}
- /** flush all currently written data to the file system */
+ /**
+ * flush all currently written data to the file system
+ * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
+ */
+ @Deprecated
public void syncFs() throws IOException {
if (out != null) {
out.hflush(); // flush contents to file system
}
}
+ @Override
+ public void hsync() throws IOException {
+ if (out != null) {
+ out.hsync();
+ }
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ if (out != null) {
+ out.hflush();
+ }
+ }
+
/** Returns the configuration of this file. */
Configuration getConf() { return conf; }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index aeb926004b..647a583d10 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -74,6 +74,11 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
Progressable progress) throws IOException {
return null;
}
+ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+ EnumSet flags, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return null;
+ }
public boolean mkdirs(Path f) { return false; }
public FSDataInputStream open(Path f) { return null; }
public FSDataOutputStream create(Path f) { return null; }
@@ -123,6 +128,15 @@ public FSDataOutputStream create(Path f,
Progressable progress) {
return null;
}
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ EnumSet flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ return null;
+ }
public String getName() { return null; }
public boolean delete(Path f) { return false; }
public short getReplication(Path src) { return 0 ; }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 20e7ea553a..ea46efcee5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -13,6 +13,8 @@ Trunk (unreleased changes)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
+ HDFS-744. Support hsync in HDFS. (Lars Hofhansl via szetszwo)
+
IMPROVEMENTS
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 88a8714b62..f7d6fdc37b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -129,11 +129,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private long initialFileSize = 0; // at time of file open
private Progressable progress;
private final short blockReplication; // replication factor of file
+ private boolean shouldSyncBlock = false; // force blocks to disk upon close
private class Packet {
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // offset in block
- boolean lastPacketInBlock; // is this the last packet in block?
+ private boolean lastPacketInBlock; // is this the last packet in block?
+ boolean syncBlock; // this packet forces the current block to disk
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
@@ -245,7 +247,7 @@ ByteBuffer getBuffer() {
buffer.mark();
PacketHeader header = new PacketHeader(
- pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
header.putInBuffer(buffer);
buffer.reset();
@@ -1249,6 +1251,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked,
long blockSize, Progressable progress, int buffersize,
DataChecksum checksum) throws IOException {
this(dfsClient, src, blockSize, progress, checksum, replication);
+ this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
@@ -1431,6 +1434,7 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] che
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
+ currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
@@ -1450,6 +1454,24 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] che
*/
@Override
public void hflush() throws IOException {
+ flushOrSync(false);
+ }
+
+ /**
+ * The expected semantics is all data have flushed out to all replicas
+ * and all replicas have done posix fsync equivalent - ie the OS has
+ * flushed it to the disk device (but the disk may have it in its cache).
+ *
+ * Note that only the current block is flushed to the disk device.
+ * To guarantee durable sync across block boundaries the stream should
+ * be created with {@link CreateFlag#SYNC_BLOCK}.
+ */
+ @Override
+ public void hsync() throws IOException {
+ flushOrSync(true);
+ }
+
+ private void flushOrSync(boolean isSync) throws IOException {
dfsClient.checkOpen();
isClosed();
try {
@@ -1477,7 +1499,13 @@ public void hflush() throws IOException {
assert bytesCurBlock > lastFlushOffset;
// record the valid offset of this flush
lastFlushOffset = bytesCurBlock;
- waitAndQueueCurrentPacket();
+ if (isSync && currentPacket == null) {
+ // Nothing to send right now,
+ // but sync was requested.
+ // Send an empty packet
+ currentPacket = new Packet(packetSize, chunksPerPacket,
+ bytesCurBlock);
+ }
} else {
// We already flushed up to this offset.
// This means that we haven't written anything since the last flush
@@ -1487,8 +1515,21 @@ public void hflush() throws IOException {
assert oldCurrentPacket == null :
"Empty flush should not occur with a currentPacket";
- // just discard the current packet since it is already been sent.
- currentPacket = null;
+ if (isSync && bytesCurBlock > 0) {
+ // Nothing to send right now,
+ // and the block was partially written,
+ // and sync was requested.
+ // So send an empty sync packet.
+ currentPacket = new Packet(packetSize, chunksPerPacket,
+ bytesCurBlock);
+ } else {
+ // just discard the current packet since it is already been sent.
+ currentPacket = null;
+ }
+ }
+ if (currentPacket != null) {
+ currentPacket.syncBlock = isSync;
+ waitAndQueueCurrentPacket();
}
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
@@ -1539,18 +1580,6 @@ public void hflush() throws IOException {
}
}
- /**
- * The expected semantics is all data have flushed out to all replicas
- * and all replicas have done posix fsync equivalent - ie the OS has
- * flushed it to the disk device (but the disk may have it in its cache).
- *
- * Right now by default it is implemented as hflush
- */
- @Override
- public synchronized void hsync() throws IOException {
- hflush();
- }
-
/**
* @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
*/
@@ -1675,6 +1704,7 @@ public synchronized void close() throws IOException {
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
+ currentPacket.syncBlock = shouldSyncBlock;
}
flushInternal(); // flush all data to Datanodes
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 0cb95c0552..1de353bdb8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -223,12 +223,19 @@ public HdfsDataOutputStream append(Path f, int bufferSize,
@Override
public HdfsDataOutputStream create(Path f, FsPermission permission,
- boolean overwrite, int bufferSize, short replication, long blockSize,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return create(f, permission,
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+ blockSize, progress);
+ }
+
+ @Override
+ public HdfsDataOutputStream create(Path f, FsPermission permission,
+ EnumSet cflags, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
- final EnumSet cflags = overwrite?
- EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE);
final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
replication, blockSize, progress, bufferSize);
return new HdfsDataOutputStream(out, statistics);
@@ -249,6 +256,7 @@ protected HdfsDataOutputStream primitiveCreate(Path f,
/**
* Same as create(), except fails if parent directory doesn't already exist.
*/
+ @Override
public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet flag, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
index d8b9f2b620..083e2b0b91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
@@ -40,6 +40,7 @@ public class PacketHeader {
.setSeqno(0)
.setLastPacketInBlock(false)
.setDataLen(0)
+ .setSyncBlock(false)
.build().getSerializedSize();
public static final int PKT_HEADER_LEN =
6 + PROTO_SIZE;
@@ -51,13 +52,14 @@ public PacketHeader() {
}
public PacketHeader(int packetLen, long offsetInBlock, long seqno,
- boolean lastPacketInBlock, int dataLen) {
+ boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
this.packetLen = packetLen;
proto = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
.setDataLen(dataLen)
+ .setSyncBlock(syncBlock)
.build();
}
@@ -81,6 +83,10 @@ public int getPacketLen() {
return packetLen;
}
+ public boolean getSyncBlock() {
+ return proto.getSyncBlock();
+ }
+
@Override
public String toString() {
return "PacketHeader with packetLen=" + packetLen +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 72591e018e..f0f7c780a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -110,6 +111,8 @@ class BlockReceiver implements Closeable {
private final BlockConstructionStage stage;
private final boolean isTransfer;
+ private boolean syncOnClose;
+
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
@@ -245,14 +248,18 @@ class BlockReceiver implements Closeable {
* close files.
*/
public void close() throws IOException {
-
IOException ioe = null;
+ if (syncOnClose && (out != null || checksumOut != null)) {
+ datanode.metrics.incrFsyncCount();
+ }
// close checksum file
try {
if (checksumOut != null) {
checksumOut.flush();
- if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) {
+ if (syncOnClose && (cout instanceof FileOutputStream)) {
+ long start = Util.now();
((FileOutputStream)cout).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
}
checksumOut.close();
checksumOut = null;
@@ -267,8 +274,10 @@ public void close() throws IOException {
try {
if (out != null) {
out.flush();
- if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) {
+ if (syncOnClose && (out instanceof FileOutputStream)) {
+ long start = Util.now();
((FileOutputStream)out).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
}
out.close();
out = null;
@@ -290,12 +299,25 @@ public void close() throws IOException {
* Flush block data and metadata files to disk.
* @throws IOException
*/
- void flush() throws IOException {
+ void flushOrSync(boolean isSync) throws IOException {
+ if (isSync && (out != null || checksumOut != null)) {
+ datanode.metrics.incrFsyncCount();
+ }
if (checksumOut != null) {
checksumOut.flush();
+ if (isSync && (cout instanceof FileOutputStream)) {
+ long start = Util.now();
+ ((FileOutputStream)cout).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
+ }
}
if (out != null) {
out.flush();
+ if (isSync && (out instanceof FileOutputStream)) {
+ long start = Util.now();
+ ((FileOutputStream)out).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
+ }
}
}
@@ -533,7 +555,9 @@ private int receivePacket() throws IOException {
header.getOffsetInBlock(),
header.getSeqno(),
header.isLastPacketInBlock(),
- header.getDataLen(), endOfHeader);
+ header.getDataLen(),
+ header.getSyncBlock(),
+ endOfHeader);
}
/**
@@ -549,15 +573,19 @@ private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
* returns the number of data bytes that the packet has.
*/
private int receivePacket(long offsetInBlock, long seqno,
- boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
+ boolean lastPacketInBlock, int len, boolean syncBlock,
+ int endOfHeader) throws IOException {
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
" of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
+ " syncBlock " + syncBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
-
+ // make sure the block gets sync'ed upon close
+ this.syncOnClose |= syncBlock && lastPacketInBlock;
+
// update received bytes
long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
@@ -587,6 +615,10 @@ private int receivePacket(long offsetInBlock, long seqno,
if(LOG.isDebugEnabled()) {
LOG.debug("Receiving an empty packet or the end of the block " + block);
}
+ // flush unless close() would flush anyway
+ if (syncBlock && !lastPacketInBlock) {
+ flushOrSync(true);
+ }
} else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
@@ -677,8 +709,8 @@ private int receivePacket(long offsetInBlock, long seqno,
);
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
- /// flush entire packet
- flush();
+ /// flush entire packet, sync unless close() will sync
+ flushOrSync(syncBlock && !lastPacketInBlock);
replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum
@@ -730,6 +762,7 @@ void receiveBlock(
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams) throws IOException {
+ syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
@@ -768,7 +801,7 @@ void receiveBlock(
datanode.data.convertTemporaryToRbw(block);
} else {
// for isDatnode or TRANSFER_FINALIZED
- // Finalize the block. Does this fsync()?
+ // Finalize the block.
datanode.data.finalizeBlock(block);
}
datanode.metrics.incrBlocksWritten();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 6a830dbbf9..12ee56ece7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -701,8 +701,9 @@ private boolean isLongRead() {
*/
private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
+ // both syncBlock and syncPacket are false
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
- (dataLen == 0), dataLen);
+ (dataLen == 0), dataLen, false);
header.putInBuffer(pkt);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 9e18007810..a849cdad2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -61,6 +61,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong writesFromLocalClient;
@Metric MutableCounterLong writesFromRemoteClient;
@Metric MutableCounterLong blocksGetLocalPathInfo;
+
+ @Metric MutableCounterLong fsyncCount;
@Metric MutableCounterLong volumeFailures;
@@ -72,6 +74,8 @@ public class DataNodeMetrics {
@Metric MutableRate heartbeats;
@Metric MutableRate blockReports;
+ @Metric MutableRate fsync;
+
final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name;
@@ -151,6 +155,14 @@ public void incrBlocksRead() {
blocksRead.incr();
}
+ public void incrFsyncCount() {
+ fsyncCount.incr();
+ }
+
+ public void addFsync(long latency) {
+ fsync.add(latency);
+ }
+
public void shutdown() {
DefaultMetricsSystem.shutdown();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 316c05cea9..d64f78051e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -113,6 +113,7 @@ message PacketHeaderProto {
required sfixed64 seqno = 2;
required bool lastPacketInBlock = 3;
required sfixed32 dataLen = 4;
+ optional bool syncBlock = 5 [default = false];
}
enum Status {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
index 07cb456491..a08c7b5799 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
@@ -139,7 +139,7 @@ public static void check(FileSystem fs, Path p, long length) throws IOException
/**
* create a buffer that contains the entire test file data.
*/
- static byte[] initBuffer(int size) {
+ public static byte[] initBuffer(int size) {
if (seed == -1)
seed = nextLong();
return randomBytes(seed, size);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 3ef892b4f1..aed15d819d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -159,7 +159,8 @@ private void writeZeroLengthPacket(ExtendedBlock block, String description)
block.getNumBytes(), // OffsetInBlock
100, // sequencenumber
true, // lastPacketInBlock
- 0); // chunk length
+ 0, // chunk length
+ false); // sync block
hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
@@ -402,7 +403,8 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
0, // offset in block,
100, // seqno
false, // last packet
- -1 - random.nextInt(oneMil)); // bad datalen
+ -1 - random.nextInt(oneMil), // bad datalen
+ false);
hdr.write(sendOut);
sendResponse(Status.SUCCESS, "", null, recvOut);
@@ -424,7 +426,8 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
0, // OffsetInBlock
100, // sequencenumber
true, // lastPacketInBlock
- 0); // chunk length
+ 0, // chunk length
+ false);
hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
sendOut.flush();
@@ -508,8 +511,8 @@ public void testPacketHeader() throws IOException {
1024, // OffsetInBlock
100, // sequencenumber
false, // lastPacketInBlock
- 4096); // chunk length
-
+ 4096, // chunk length
+ false);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
hdr.write(new DataOutputStream(baos));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java
new file mode 100644
index 0000000000..cf2e4483de
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.test.MetricsAsserts.*;
+
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.junit.Test;
+
+public class TestHSync {
+
+ private void checkSyncMetric(MiniDFSCluster cluster, int dn, long value) {
+ DataNode datanode = cluster.getDataNodes().get(dn);
+ assertCounter("FsyncCount", value, getMetrics(datanode.getMetrics().name()));
+ }
+ private void checkSyncMetric(MiniDFSCluster cluster, long value) {
+ checkSyncMetric(cluster, 0, value);
+ }
+ /** Test basic hsync cases */
+ @Test
+ public void testHSync() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ final FileSystem fs = cluster.getFileSystem();
+
+ final Path p = new Path("/testHSync/foo");
+ final int len = 1 << 16;
+ FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
+ 4096, (short) 1, len, null);
+ out.hflush();
+ // hflush does not sync
+ checkSyncMetric(cluster, 0);
+ out.hsync();
+ // hsync on empty file does nothing
+ checkSyncMetric(cluster, 0);
+ out.write(1);
+ checkSyncMetric(cluster, 0);
+ out.hsync();
+ checkSyncMetric(cluster, 1);
+ // avoiding repeated hsyncs is a potential future optimization
+ out.hsync();
+ checkSyncMetric(cluster, 2);
+ out.hflush();
+ // hflush still does not sync
+ checkSyncMetric(cluster, 2);
+ out.close();
+ // close is sync'ing
+ checkSyncMetric(cluster, 3);
+
+ // same with a file created with out SYNC_BLOCK
+ out = fs.create(p, FsPermission.getDefault(),
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+ 4096, (short) 1, len, null);
+ out.hsync();
+ checkSyncMetric(cluster, 3);
+ out.write(1);
+ checkSyncMetric(cluster, 3);
+ out.hsync();
+ checkSyncMetric(cluster, 4);
+ // repeated hsyncs
+ out.hsync();
+ checkSyncMetric(cluster, 5);
+ out.close();
+ // close does not sync (not opened with SYNC_BLOCK)
+ checkSyncMetric(cluster, 5);
+ cluster.shutdown();
+ }
+
+ /** Test hsync on an exact block boundary */
+ @Test
+ public void testHSyncBlockBoundary() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ final FileSystem fs = cluster.getFileSystem();
+
+ final Path p = new Path("/testHSyncBlockBoundary/foo");
+ final int len = 1 << 16;
+ final byte[] fileContents = AppendTestUtil.initBuffer(len);
+ FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
+ 4096, (short) 1, len, null);
+ // fill exactly one block (tests the SYNC_BLOCK case) and flush
+ out.write(fileContents, 0, len);
+ out.hflush();
+ // the full block should have caused a sync
+ checkSyncMetric(cluster, 1);
+ out.hsync();
+ // first on block again
+ checkSyncMetric(cluster, 1);
+ // write one more byte and sync again
+ out.write(1);
+ out.hsync();
+ checkSyncMetric(cluster, 2);
+ out.close();
+ checkSyncMetric(cluster, 3);
+ cluster.shutdown();
+ }
+
+ /** Test hsync via SequenceFiles */
+ @Test
+ public void testSequenceFileSync() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+
+ final FileSystem fs = cluster.getFileSystem();
+ final Path p = new Path("/testSequenceFileSync/foo");
+ final int len = 1 << 16;
+ FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
+ 4096, (short) 1, len, null);
+ Writer w = SequenceFile.createWriter(new Configuration(),
+ Writer.stream(out),
+ Writer.keyClass(RandomDatum.class),
+ Writer.valueClass(RandomDatum.class),
+ Writer.compression(CompressionType.NONE, new DefaultCodec()));
+ w.hflush();
+ checkSyncMetric(cluster, 0);
+ w.hsync();
+ checkSyncMetric(cluster, 1);
+ int seed = new Random().nextInt();
+ RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+ generator.next();
+ w.append(generator.getKey(), generator.getValue());
+ w.hsync();
+ checkSyncMetric(cluster, 2);
+ w.close();
+ checkSyncMetric(cluster, 2);
+ out.close();
+ checkSyncMetric(cluster, 3);
+ cluster.shutdown();
+ }
+
+ /** Test that syncBlock is correctly performed at replicas */
+ @Test
+ public void testHSyncWithReplication() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ final FileSystem fs = cluster.getFileSystem();
+
+ final Path p = new Path("/testHSyncWithReplication/foo");
+ final int len = 1 << 16;
+ FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
+ 4096, (short) 3, len, null);
+ out.write(1);
+ out.hflush();
+ checkSyncMetric(cluster, 0, 0);
+ checkSyncMetric(cluster, 1, 0);
+ checkSyncMetric(cluster, 2, 0);
+ out.hsync();
+ checkSyncMetric(cluster, 0, 1);
+ checkSyncMetric(cluster, 1, 1);
+ checkSyncMetric(cluster, 2, 1);
+ out.hsync();
+ checkSyncMetric(cluster, 0, 2);
+ checkSyncMetric(cluster, 1, 2);
+ checkSyncMetric(cluster, 2, 2);
+ cluster.shutdown();
+ }
+}