diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index a5bb370172..534919e13f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.azure; import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER; -import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -2043,11 +2042,9 @@ public DataInputStream retrieve(String key) throws AzureException, IOException { // Get blob reference and open the input buffer stream. CloudBlobWrapper blob = getBlobReference(key); - BufferedInputStream inBufStream = new BufferedInputStream( - openInputStream(blob)); // Return a data input stream. - DataInputStream inDataStream = new DataInputStream(inBufStream); + DataInputStream inDataStream = new DataInputStream(openInputStream(blob)); return inDataStream; } catch (Exception e) { // Re-throw as an Azure storage exception. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index b61baabb4a..b2cc4eaf26 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -816,6 +816,27 @@ public synchronized int read() throws FileNotFoundException, IOException { } } + @Override + public synchronized void readFully(long position, byte[] buffer, int offset, int length) + throws IOException { + validatePositionedReadArgs(position, buffer, offset, length); + + int nread = 0; + while (nread < length) { + // In case BlobInputStream is used, mark() can act as a hint to read ahead only this + // length instead of 4 MB boundary. + in.mark(length - nread); + int nbytes = read(position + nread, + buffer, + offset + nread, + length - nread); + if (nbytes < 0) { + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); + } + nread += nbytes; + } + } + /* * Reads up to len bytes of data from the input stream into an array of * bytes. An attempt is made to read as many as len bytes, but a smaller @@ -886,9 +907,13 @@ public synchronized void seek(long pos) throws FileNotFoundException, EOFExcepti if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } - IOUtils.closeStream(in); - in = store.retrieve(key); - this.pos = in.skip(pos); + if (this.pos > pos) { + IOUtils.closeStream(in); + in = store.retrieve(key); + this.pos = in.skip(pos); + } else { + this.pos += in.skip(pos - this.pos); + } LOG.debug("Seek to position {}. Bytes skipped {}", pos, this.pos); } catch(IOException e) {