HADOOP-12444 Support lazy seek in S3AInputStream. Rajesh Balamohan via stevel

This commit is contained in:
Steve Loughran 2016-04-09 11:24:39 +01:00
parent fcb3fcd4c6
commit b9e3eff62a

View File

@ -20,7 +20,6 @@
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
@ -37,82 +36,128 @@ public class S3AInputStream extends FSInputStream {
private long pos; private long pos;
private boolean closed; private boolean closed;
private S3ObjectInputStream wrappedStream; private S3ObjectInputStream wrappedStream;
private FileSystem.Statistics stats; private final FileSystem.Statistics stats;
private AmazonS3Client client; private final AmazonS3Client client;
private String bucket; private final String bucket;
private String key; private final String key;
private long contentLength; private final long contentLength;
private final String uri;
public static final Logger LOG = S3AFileSystem.LOG; public static final Logger LOG = S3AFileSystem.LOG;
public static final long CLOSE_THRESHOLD = 4096; public static final long CLOSE_THRESHOLD = 4096;
public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client, // Used by lazy seek
FileSystem.Statistics stats) { private long nextReadPos;
//Amount of data requested from the request
private long requestedStreamLen;
public S3AInputStream(String bucket, String key, long contentLength,
AmazonS3Client client, FileSystem.Statistics stats) {
this.bucket = bucket; this.bucket = bucket;
this.key = key; this.key = key;
this.contentLength = contentLength; this.contentLength = contentLength;
this.client = client; this.client = client;
this.stats = stats; this.stats = stats;
this.pos = 0; this.pos = 0;
this.nextReadPos = 0;
this.closed = false; this.closed = false;
this.wrappedStream = null; this.wrappedStream = null;
this.uri = "s3a://" + this.bucket + "/" + this.key;
} }
private void openIfNeeded() throws IOException { /**
if (wrappedStream == null) { * Opens up the stream at specified target position and for given length.
reopen(0); *
} * @param targetPos target position
} * @param length length requested
* @throws IOException
private synchronized void reopen(long pos) throws IOException { */
private synchronized void reopen(long targetPos, long length)
throws IOException {
requestedStreamLen = (length < 0) ? this.contentLength :
Math.max(this.contentLength, (CLOSE_THRESHOLD + (targetPos + length)));
if (wrappedStream != null) { if (wrappedStream != null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos); LOG.debug("Closing the previous stream");
} }
wrappedStream.abort(); closeStream(requestedStreamLen);
} }
if (pos < 0) { if (LOG.isDebugEnabled()) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK LOG.debug("Requesting for "
+" " + pos); + "targetPos=" + targetPos
+ ", length=" + length
+ ", requestedStreamLen=" + requestedStreamLen
+ ", streamPosition=" + pos
+ ", nextReadPosition=" + nextReadPos
);
} }
if (contentLength > 0 && pos > contentLength-1) { GetObjectRequest request = new GetObjectRequest(bucket, key)
throw new EOFException( .withRange(targetPos, requestedStreamLen);
FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+ " " + pos);
}
LOG.debug("Actually opening file " + key + " at pos " + pos);
GetObjectRequest request = new GetObjectRequest(bucket, key);
request.setRange(pos, contentLength-1);
wrappedStream = client.getObject(request).getObjectContent(); wrappedStream = client.getObject(request).getObjectContent();
if (wrappedStream == null) { if (wrappedStream == null) {
throw new IOException("Null IO stream"); throw new IOException("Null IO stream");
} }
this.pos = pos; this.pos = targetPos;
} }
@Override @Override
public synchronized long getPos() throws IOException { public synchronized long getPos() throws IOException {
return pos; return (nextReadPos < 0) ? 0 : nextReadPos;
} }
@Override @Override
public synchronized void seek(long pos) throws IOException { public synchronized void seek(long targetPos) throws IOException {
checkNotClosed(); checkNotClosed();
if (this.pos == pos) { // Do not allow negative seek
if (targetPos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ " " + targetPos);
}
if (this.contentLength <= 0) {
return; return;
} }
LOG.debug( // Lazy seek
"Reopening " + this.key + " to seek to new offset " + (pos - this.pos)); nextReadPos = targetPos;
reopen(pos); }
/**
* Adjust the stream to a specific position.
*
* @param targetPos target seek position
* @param length length of content that needs to be read from targetPos
* @throws IOException
*/
private void seekInStream(long targetPos, long length) throws IOException {
checkNotClosed();
if (wrappedStream == null) {
return;
}
// compute how much more to skip
long diff = targetPos - pos;
if (targetPos > pos) {
if ((diff + length) <= wrappedStream.available()) {
// already available in buffer
pos += wrappedStream.skip(diff);
if (pos != targetPos) {
throw new IOException("Failed to seek to " + targetPos
+ ". Current position " + pos);
}
return;
}
}
// close the stream; if read the object will be opened at the new pos
closeStream(this.requestedStreamLen);
pos = targetPos;
} }
@Override @Override
@ -120,22 +165,40 @@ public boolean seekToNewSource(long targetPos) throws IOException {
return false; return false;
} }
/**
* Perform lazy seek and adjust stream to correct position for reading.
*
* @param targetPos position from where data should be read
* @param len length of the content that needs to be read
*/
private void lazySeek(long targetPos, long len) throws IOException {
//For lazy seek
if (targetPos != this.pos) {
seekInStream(targetPos, len);
}
//re-open at specific location if needed
if (wrappedStream == null) {
reopen(targetPos, len);
}
}
@Override @Override
public synchronized int read() throws IOException { public synchronized int read() throws IOException {
checkNotClosed(); checkNotClosed();
if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
return -1;
}
openIfNeeded(); lazySeek(nextReadPos, 1);
int byteRead; int byteRead;
try { try {
byteRead = wrappedStream.read(); byteRead = wrappedStream.read();
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException | SocketException e) {
LOG.info("Got timeout while trying to read from stream, trying to recover " + e); LOG.info("Got exception while trying to read from stream,"
reopen(pos); + " trying to recover " + e);
byteRead = wrappedStream.read(); reopen(pos, 1);
} catch (SocketException e) {
LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
reopen(pos);
byteRead = wrappedStream.read(); byteRead = wrappedStream.read();
} catch (EOFException e) { } catch (EOFException e) {
return -1; return -1;
@ -143,6 +206,7 @@ public synchronized int read() throws IOException {
if (byteRead >= 0) { if (byteRead >= 0) {
pos++; pos++;
nextReadPos++;
} }
if (stats != null && byteRead >= 0) { if (stats != null && byteRead >= 0) {
@ -152,26 +216,34 @@ public synchronized int read() throws IOException {
} }
@Override @Override
public synchronized int read(byte[] buf, int off, int len) throws IOException { public synchronized int read(byte[] buf, int off, int len)
throws IOException {
checkNotClosed(); checkNotClosed();
openIfNeeded(); validatePositionedReadArgs(nextReadPos, buf, off, len);
if (len == 0) {
return 0;
}
if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
return -1;
}
lazySeek(nextReadPos, len);
int byteRead; int byteRead;
try { try {
byteRead = wrappedStream.read(buf, off, len); byteRead = wrappedStream.read(buf, off, len);
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException | SocketException e) {
LOG.info("Got timeout while trying to read from stream, trying to recover " + e); LOG.info("Got exception while trying to read from stream,"
reopen(pos); + " trying to recover " + e);
byteRead = wrappedStream.read(buf, off, len); reopen(pos, len);
} catch (SocketException e) {
LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
reopen(pos);
byteRead = wrappedStream.read(buf, off, len); byteRead = wrappedStream.read(buf, off, len);
} }
if (byteRead > 0) { if (byteRead > 0) {
pos += byteRead; pos += byteRead;
nextReadPos += byteRead;
} }
if (stats != null && byteRead > 0) { if (stats != null && byteRead > 0) {
@ -191,15 +263,43 @@ private void checkNotClosed() throws IOException {
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
super.close(); super.close();
closed = true; closed = true;
closeStream(this.contentLength);
}
/**
* Close a stream: decide whether to abort or close, based on
* the length of the stream and the current position.
*
* This does not set the {@link #closed} flag.
* @param length length of the stream.
* @throws IOException
*/
private void closeStream(long length) throws IOException {
if (wrappedStream != null) { if (wrappedStream != null) {
if (contentLength - pos <= CLOSE_THRESHOLD) { String reason = null;
// Close, rather than abort, so that the http connection can be reused. boolean shouldAbort = length - pos > CLOSE_THRESHOLD;
if (!shouldAbort) {
try {
reason = "Closed stream";
wrappedStream.close(); wrappedStream.close();
} else { } catch (IOException e) {
// exception escalates to an abort
LOG.debug("When closing stream", e);
shouldAbort = true;
}
}
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the // Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream. // remaining object payload is read from S3 while closing the stream.
wrappedStream.abort(); wrappedStream.abort();
reason = "Closed stream with abort";
} }
if (LOG.isDebugEnabled()) {
LOG.debug(reason + "; streamPos=" + pos
+ ", nextReadPos=" + nextReadPos
+ ", contentLength=" + length);
}
wrappedStream = null;
} }
} }
@ -219,6 +319,18 @@ public boolean markSupported() {
return false; return false;
} }
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"S3AInputStream{");
sb.append(uri);
sb.append(" pos=").append(pos);
sb.append(" nextReadPos=").append(nextReadPos);
sb.append(" contentLength=").append(contentLength);
sb.append('}');
return sb.toString();
}
/** /**
* Subclass {@code readFully()} operation which only seeks at the start * Subclass {@code readFully()} operation which only seeks at the start
* of the series of operations; seeking back at the end. * of the series of operations; seeking back at the end.
@ -234,6 +346,7 @@ public boolean markSupported() {
@Override @Override
public void readFully(long position, byte[] buffer, int offset, int length) public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException { throws IOException {
checkNotClosed();
validatePositionedReadArgs(position, buffer, offset, length); validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) { if (length == 0) {
return; return;