diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1ea68298ca..d3a7edb1eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -421,6 +421,9 @@ Release 2.4.0 - UNRELEASED HDFS-5318. Support read-only and read-write paths to shared replicas. (Eric Sirianni via Arpit Agarwal) + HDFS-5868. Make hsync implementation pluggable on the DataNode. + (Buddy Taylor via Arpit Agarwal) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 3106f7f688..c1ed03ceb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -78,7 +78,6 @@ class BlockReceiver implements Closeable { private boolean needsChecksumTranslation; private OutputStream out = null; // to block file at local disk private FileDescriptor outFd; - private OutputStream cout = null; // output stream for cehcksum file private DataOutputStream checksumOut = null; // to crc file at local disk private int bytesPerChecksum; private int checksumSize; @@ -223,9 +222,8 @@ class BlockReceiver implements Closeable { LOG.warn("Could not get file descriptor for outputstream of class " + out.getClass()); } - this.cout = streams.getChecksumOut(); this.checksumOut = new DataOutputStream(new BufferedOutputStream( - cout, HdfsConstants.SMALL_BUFFER_SIZE)); + streams.getChecksumOut(), HdfsConstants.SMALL_BUFFER_SIZE)); // write data chunk header if creating a new replica if (isCreate) { BlockMetadataHeader.writeHeader(checksumOut, diskChecksum); @@ -280,9 +278,9 @@ class BlockReceiver implements Closeable { long flushStartNanos = System.nanoTime(); checksumOut.flush(); long flushEndNanos = System.nanoTime(); - if (syncOnClose && (cout instanceof FileOutputStream)) { + if (syncOnClose) { long fsyncStartNanos = flushEndNanos; - ((FileOutputStream)cout).getChannel().force(true); + streams.syncChecksumOut(); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } flushTotalNanos += flushEndNanos - flushStartNanos; @@ -302,9 +300,9 @@ class BlockReceiver implements Closeable { long flushStartNanos = System.nanoTime(); out.flush(); long flushEndNanos = System.nanoTime(); - if (syncOnClose && (out instanceof FileOutputStream)) { + if (syncOnClose) { long fsyncStartNanos = flushEndNanos; - ((FileOutputStream)out).getChannel().force(true); + streams.syncDataOut(); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } flushTotalNanos += flushEndNanos - flushStartNanos; @@ -338,9 +336,9 @@ class BlockReceiver implements Closeable { long flushStartNanos = System.nanoTime(); checksumOut.flush(); long flushEndNanos = System.nanoTime(); - if (isSync && (cout instanceof FileOutputStream)) { + if (isSync) { long fsyncStartNanos = flushEndNanos; - ((FileOutputStream)cout).getChannel().force(true); + streams.syncChecksumOut(); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } flushTotalNanos += flushEndNanos - flushStartNanos; @@ -349,9 +347,9 @@ class BlockReceiver implements Closeable { long flushStartNanos = System.nanoTime(); out.flush(); long flushEndNanos = System.nanoTime(); - if (isSync && (out instanceof FileOutputStream)) { + if (isSync) { long fsyncStartNanos = flushEndNanos; - ((FileOutputStream)out).getChannel().force(true); + streams.syncDataOut(); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } flushTotalNanos += flushEndNanos - flushStartNanos; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java index 3866392d93..95044c825d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; +import java.io.FileOutputStream; import java.io.OutputStream; +import java.io.IOException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; @@ -62,4 +64,23 @@ public class ReplicaOutputStreams implements Closeable { IOUtils.closeStream(dataOut); IOUtils.closeStream(checksumOut); } -} \ No newline at end of file + + /** + * Sync the data stream if it supports it. + */ + public void syncDataOut() throws IOException { + if (dataOut instanceof FileOutputStream) { + ((FileOutputStream)dataOut).getChannel().force(true); + } + } + + /** + * Sync the checksum stream if it supports it. + */ + public void syncChecksumOut() throws IOException { + if (checksumOut instanceof FileOutputStream) { + ((FileOutputStream)checksumOut).getChannel().force(true); + } + } + +}