From 98aa4fc32c0dfede20f57a7a05297a9e22d2f7d7 Mon Sep 17 00:00:00 2001 From: kishendas Date: Tue, 4 May 2021 01:20:56 -0700 Subject: [PATCH] HADOOP-17657: implement StreamCapabilities in SequenceFile.Writer and fall back to flush, if hflush is not supported (#2949) Co-authored-by: Kishen Das Reviewed-by: Steve Loughran (cherry picked from commit e571025f5b371ade25d1457f0186ba656bb71c5f) --- .../org/apache/hadoop/io/SequenceFile.java | 19 +++++++++++++- .../apache/hadoop/io/TestSequenceFile.java | 26 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 3f4649f04d..0581fb3f57 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -27,6 +27,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.util.Options; import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -834,7 +835,8 @@ public String toString() { } /** Write key/value pairs to a sequence-format file. */ - public static class Writer implements java.io.Closeable, Syncable { + public static class Writer implements java.io.Closeable, Syncable, + Flushable, StreamCapabilities { private Configuration conf; FSDataOutputStream out; boolean ownOutputStream = true; @@ -1367,6 +1369,21 @@ public void hflush() throws IOException { out.hflush(); } } + + @Override + public void flush() throws IOException { + if (out != null) { + out.flush(); + } + } + + @Override + public boolean hasCapability(String capability) { + if (out !=null && capability != null) { + return out.hasCapability(capability); + } + return false; + } /** Returns the configuration of this file. */ Configuration getConf() { return conf; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java index 044824356e..5e4d578cae 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java @@ -30,6 +30,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.conf.*; +import org.assertj.core.api.Assertions; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -724,6 +725,31 @@ public void testSerializationAvailability() throws IOException { } } + @Test + public void testSequenceFileWriter() throws Exception { + Configuration conf = new Configuration(); + // This test only works with Raw File System and not Local File System + FileSystem fs = FileSystem.getLocal(conf).getRaw(); + Path p = new Path(GenericTestUtils + .getTempPath("testSequenceFileWriter.seq")); + try(SequenceFile.Writer writer = SequenceFile.createWriter( + fs, conf, p, LongWritable.class, Text.class)) { + Assertions.assertThat(writer.hasCapability + (StreamCapabilities.HSYNC)).isEqualTo(true); + Assertions.assertThat(writer.hasCapability( + StreamCapabilities.HFLUSH)).isEqualTo(true); + LongWritable key = new LongWritable(); + key.set(1); + Text value = new Text(); + value.set("somevalue"); + writer.append(key, value); + writer.flush(); + writer.hflush(); + writer.hsync(); + Assertions.assertThat(fs.getFileStatus(p).getLen()).isGreaterThan(0); + } + } + /** For debugging and testing. */ public static void main(String[] args) throws Exception { int count = 1024 * 1024;