HADOOP-14478. Optimize NativeAzureFsInputStream for positional reads. Contributed by Rajesh Balamohan

This commit is contained in:
Mingliang Liu 2017-06-05 15:56:43 -07:00
parent 835560983e
commit 5fd9742c83
2 changed files with 29 additions and 7 deletions

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.fs.azure;
import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -2043,11 +2042,9 @@ public DataInputStream retrieve(String key) throws AzureException, IOException {
// Get blob reference and open the input buffer stream.
CloudBlobWrapper blob = getBlobReference(key);
BufferedInputStream inBufStream = new BufferedInputStream(
openInputStream(blob));
// Return a data input stream.
DataInputStream inDataStream = new DataInputStream(inBufStream);
DataInputStream inDataStream = new DataInputStream(openInputStream(blob));
return inDataStream;
} catch (Exception e) {
// Re-throw as an Azure storage exception.

View File

@ -816,6 +816,27 @@ public synchronized int read() throws FileNotFoundException, IOException {
}
}
@Override
public synchronized void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
int nread = 0;
while (nread < length) {
// In case BlobInputStream is used, mark() can act as a hint to read ahead only this
// length instead of 4 MB boundary.
in.mark(length - nread);
int nbytes = read(position + nread,
buffer,
offset + nread,
length - nread);
if (nbytes < 0) {
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
}
}
/*
* Reads up to len bytes of data from the input stream into an array of
* bytes. An attempt is made to read as many as len bytes, but a smaller
@ -886,9 +907,13 @@ public synchronized void seek(long pos) throws FileNotFoundException, EOFExcepti
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
IOUtils.closeStream(in);
in = store.retrieve(key);
this.pos = in.skip(pos);
if (this.pos > pos) {
IOUtils.closeStream(in);
in = store.retrieve(key);
this.pos = in.skip(pos);
} else {
this.pos += in.skip(pos - this.pos);
}
LOG.debug("Seek to position {}. Bytes skipped {}", pos,
this.pos);
} catch(IOException e) {