HADOOP-6313. Implement Syncable interface in FSDataOutputStream to expose flush APIs to application users. Contributed by Hairong Kuang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@831416 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a5427fc7eb
commit
b5c31f4ee6
@ -246,6 +246,9 @@ Release 0.21.0 - Unreleased
|
|||||||
HADOOP-6240. Add new FileContext rename operation that posix compliant
|
HADOOP-6240. Add new FileContext rename operation that posix compliant
|
||||||
that allows overwriting existing destination. (suresh)
|
that allows overwriting existing destination. (suresh)
|
||||||
|
|
||||||
|
HADOOP-6313. Implement Syncable interface in FSDataOutputStream to expose
|
||||||
|
flush APIs to application users. (Hairong Kuang via suresh)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HADOOP-4565. Added CombineFileInputFormat to use data locality information
|
HADOOP-4565. Added CombineFileInputFormat to use data locality information
|
||||||
|
@ -91,10 +91,29 @@ public OutputStream getWrappedStream() {
|
|||||||
return wrappedStream;
|
return wrappedStream;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
@Override // Syncable
|
||||||
|
@Deprecated
|
||||||
public void sync() throws IOException {
|
public void sync() throws IOException {
|
||||||
if (wrappedStream instanceof Syncable) {
|
if (wrappedStream instanceof Syncable) {
|
||||||
((Syncable)wrappedStream).sync();
|
((Syncable)wrappedStream).sync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override // Syncable
|
||||||
|
public void hflush() throws IOException {
|
||||||
|
if (wrappedStream instanceof Syncable) {
|
||||||
|
((Syncable)wrappedStream).hflush();
|
||||||
|
} else {
|
||||||
|
wrappedStream.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override // Syncable
|
||||||
|
public void hsync() throws IOException {
|
||||||
|
if (wrappedStream instanceof Syncable) {
|
||||||
|
((Syncable)wrappedStream).hsync();
|
||||||
|
} else {
|
||||||
|
wrappedStream.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -180,7 +180,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
|||||||
/*********************************************************
|
/*********************************************************
|
||||||
* For create()'s FSOutputStream.
|
* For create()'s FSOutputStream.
|
||||||
*********************************************************/
|
*********************************************************/
|
||||||
class LocalFSFileOutputStream extends OutputStream implements Syncable {
|
class LocalFSFileOutputStream extends OutputStream {
|
||||||
private FileOutputStream fos;
|
private FileOutputStream fos;
|
||||||
|
|
||||||
private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
|
private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
|
||||||
@ -207,13 +207,8 @@ public void write(int b) throws IOException {
|
|||||||
throw new FSError(e); // assume native fs error
|
throw new FSError(e); // assume native fs error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
|
||||||
public void sync() throws IOException {
|
|
||||||
fos.getFD().sync();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
public FSDataOutputStream append(Path f, int bufferSize,
|
public FSDataOutputStream append(Path f, int bufferSize,
|
||||||
Progressable progress) throws IOException {
|
Progressable progress) throws IOException {
|
||||||
|
@ -20,11 +20,23 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
/** This interface declare the sync() operation. */
|
/** This interface for flush/sync operation. */
|
||||||
public interface Syncable {
|
public interface Syncable {
|
||||||
/**
|
/**
|
||||||
* Synchronize all buffer with the underlying devices.
|
* @deprecated As of HADOOP 0.21.0, replaced by hflush
|
||||||
* @throws IOException
|
* @see #hflush()
|
||||||
*/
|
*/
|
||||||
public void sync() throws IOException;
|
@Deprecated public void sync() throws IOException;
|
||||||
|
|
||||||
|
/** Flush out the data in client's user buffer. After the return of
|
||||||
|
* this call, new readers will see the data.
|
||||||
|
* @throws IOException if any error occurs
|
||||||
|
*/
|
||||||
|
public void hflush() throws IOException;
|
||||||
|
|
||||||
|
/** Similar to posix fsync, flush out the data in client's user buffer
|
||||||
|
* all the way to the disk device (but the disk may have it in its cache).
|
||||||
|
* @throws IOException if error occurs
|
||||||
|
*/
|
||||||
|
public void hsync() throws IOException;
|
||||||
}
|
}
|
||||||
|
@ -111,6 +111,43 @@ public void testWorkingDirectory() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test Syncable interface on raw local file system
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void testSyncable() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf).getRawFileSystem();
|
||||||
|
Path file = new Path(TEST_ROOT_DIR, "syncable");
|
||||||
|
FSDataOutputStream out = fs.create(file);;
|
||||||
|
final int bytesWritten = 1;
|
||||||
|
byte[] expectedBuf = new byte[] {'0', '1', '2', '3'};
|
||||||
|
try {
|
||||||
|
out.write(expectedBuf, 0, 1);
|
||||||
|
out.hflush();
|
||||||
|
verifyFile(fs, file, bytesWritten, expectedBuf);
|
||||||
|
out.write(expectedBuf, bytesWritten, expectedBuf.length-bytesWritten);
|
||||||
|
out.hsync();
|
||||||
|
verifyFile(fs, file, expectedBuf.length, expectedBuf);
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyFile(FileSystem fs, Path file, int bytesToVerify,
|
||||||
|
byte[] expectedBytes) throws IOException {
|
||||||
|
FSDataInputStream in = fs.open(file);
|
||||||
|
try {
|
||||||
|
byte[] readBuf = new byte[bytesToVerify];
|
||||||
|
in.readFully(readBuf, 0, bytesToVerify);
|
||||||
|
for (int i=0; i<bytesToVerify; i++) {
|
||||||
|
assertEquals(expectedBytes[i], readBuf[i]);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testCopy() throws IOException {
|
public void testCopy() throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
LocalFileSystem fs = FileSystem.getLocal(conf);
|
LocalFileSystem fs = FileSystem.getLocal(conf);
|
||||||
|
Loading…
Reference in New Issue
Block a user