HADOOP-17657: implement StreamCapabilities in SequenceFile.Writer and fall back to flush, if hflush is not supported (#2949)
Co-authored-by: Kishen Das <kishen@cloudera.com>
Reviewed-by: Steve Loughran <stevel@apache.org>
(cherry picked from commit e571025f5b
)
This commit is contained in:
parent
658bb49e66
commit
98aa4fc32c
@ -27,6 +27,7 @@
|
|||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.util.Options;
|
import org.apache.hadoop.util.Options;
|
||||||
import org.apache.hadoop.fs.*;
|
import org.apache.hadoop.fs.*;
|
||||||
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||||
import org.apache.hadoop.io.compress.CodecPool;
|
import org.apache.hadoop.io.compress.CodecPool;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
@ -834,7 +835,8 @@ public String toString() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Write key/value pairs to a sequence-format file. */
|
/** Write key/value pairs to a sequence-format file. */
|
||||||
public static class Writer implements java.io.Closeable, Syncable {
|
public static class Writer implements java.io.Closeable, Syncable,
|
||||||
|
Flushable, StreamCapabilities {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
FSDataOutputStream out;
|
FSDataOutputStream out;
|
||||||
boolean ownOutputStream = true;
|
boolean ownOutputStream = true;
|
||||||
@ -1367,6 +1369,21 @@ public void hflush() throws IOException {
|
|||||||
out.hflush();
|
out.hflush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush() throws IOException {
|
||||||
|
if (out != null) {
|
||||||
|
out.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasCapability(String capability) {
|
||||||
|
if (out !=null && capability != null) {
|
||||||
|
return out.hasCapability(capability);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/** Returns the configuration of this file. */
|
/** Returns the configuration of this file. */
|
||||||
Configuration getConf() { return conf; }
|
Configuration getConf() { return conf; }
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.conf.*;
|
import org.apache.hadoop.conf.*;
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -724,6 +725,31 @@ public void testSerializationAvailability() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSequenceFileWriter() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// This test only works with Raw File System and not Local File System
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf).getRaw();
|
||||||
|
Path p = new Path(GenericTestUtils
|
||||||
|
.getTempPath("testSequenceFileWriter.seq"));
|
||||||
|
try(SequenceFile.Writer writer = SequenceFile.createWriter(
|
||||||
|
fs, conf, p, LongWritable.class, Text.class)) {
|
||||||
|
Assertions.assertThat(writer.hasCapability
|
||||||
|
(StreamCapabilities.HSYNC)).isEqualTo(true);
|
||||||
|
Assertions.assertThat(writer.hasCapability(
|
||||||
|
StreamCapabilities.HFLUSH)).isEqualTo(true);
|
||||||
|
LongWritable key = new LongWritable();
|
||||||
|
key.set(1);
|
||||||
|
Text value = new Text();
|
||||||
|
value.set("somevalue");
|
||||||
|
writer.append(key, value);
|
||||||
|
writer.flush();
|
||||||
|
writer.hflush();
|
||||||
|
writer.hsync();
|
||||||
|
Assertions.assertThat(fs.getFileStatus(p).getLen()).isGreaterThan(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** For debugging and testing. */
|
/** For debugging and testing. */
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
int count = 1024 * 1024;
|
int count = 1024 * 1024;
|
||||||
|
Loading…
Reference in New Issue
Block a user