HADOOP-14965. S3a input stream "normal" fadvise mode to be adaptive
This commit is contained in:
parent
13ad7479b0
commit
1ba491ff90
@ -83,7 +83,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
|||||||
private final S3AInstrumentation.InputStreamStatistics streamStatistics;
|
private final S3AInstrumentation.InputStreamStatistics streamStatistics;
|
||||||
private S3AEncryptionMethods serverSideEncryptionAlgorithm;
|
private S3AEncryptionMethods serverSideEncryptionAlgorithm;
|
||||||
private String serverSideEncryptionKey;
|
private String serverSideEncryptionKey;
|
||||||
private final S3AInputPolicy inputPolicy;
|
private S3AInputPolicy inputPolicy;
|
||||||
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
|
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
|
||||||
private final Invoker invoker;
|
private final Invoker invoker;
|
||||||
|
|
||||||
@ -139,11 +139,21 @@ public S3AInputStream(S3ObjectAttributes s3Attributes,
|
|||||||
this.serverSideEncryptionAlgorithm =
|
this.serverSideEncryptionAlgorithm =
|
||||||
s3Attributes.getServerSideEncryptionAlgorithm();
|
s3Attributes.getServerSideEncryptionAlgorithm();
|
||||||
this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
|
this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
|
||||||
this.inputPolicy = inputPolicy;
|
setInputPolicy(inputPolicy);
|
||||||
setReadahead(readahead);
|
setReadahead(readahead);
|
||||||
this.invoker = invoker;
|
this.invoker = invoker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set/update the input policy of the stream.
|
||||||
|
* This updates the stream statistics.
|
||||||
|
* @param inputPolicy new input policy.
|
||||||
|
*/
|
||||||
|
private void setInputPolicy(S3AInputPolicy inputPolicy) {
|
||||||
|
this.inputPolicy = inputPolicy;
|
||||||
|
streamStatistics.inputPolicySet(inputPolicy.ordinal());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Opens up the stream at specified target position and for given length.
|
* Opens up the stream at specified target position and for given length.
|
||||||
*
|
*
|
||||||
@ -162,8 +172,9 @@ private synchronized void reopen(String reason, long targetPos, long length)
|
|||||||
contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
|
contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
|
||||||
length, contentLength, readahead);
|
length, contentLength, readahead);
|
||||||
LOG.debug("reopen({}) for {} range[{}-{}], length={}," +
|
LOG.debug("reopen({}) for {} range[{}-{}], length={}," +
|
||||||
" streamPosition={}, nextReadPosition={}",
|
" streamPosition={}, nextReadPosition={}, policy={}",
|
||||||
uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos);
|
uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos,
|
||||||
|
inputPolicy);
|
||||||
|
|
||||||
long opencount = streamStatistics.streamOpened();
|
long opencount = streamStatistics.streamOpened();
|
||||||
GetObjectRequest request = new GetObjectRequest(bucket, key)
|
GetObjectRequest request = new GetObjectRequest(bucket, key)
|
||||||
@ -274,6 +285,12 @@ private void seekInStream(long targetPos, long length) throws IOException {
|
|||||||
} else if (diff < 0) {
|
} else if (diff < 0) {
|
||||||
// backwards seek
|
// backwards seek
|
||||||
streamStatistics.seekBackwards(diff);
|
streamStatistics.seekBackwards(diff);
|
||||||
|
// if the stream is in "Normal" mode, switch to random IO at this
|
||||||
|
// point, as it is indicative of columnar format IO
|
||||||
|
if (inputPolicy.equals(S3AInputPolicy.Normal)) {
|
||||||
|
LOG.info("Switching to Random IO seek policy");
|
||||||
|
setInputPolicy(S3AInputPolicy.Random);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// targetPos == pos
|
// targetPos == pos
|
||||||
if (remainingInCurrentRequest() > 0) {
|
if (remainingInCurrentRequest() > 0) {
|
||||||
@ -443,6 +460,7 @@ public synchronized void close() throws IOException {
|
|||||||
try {
|
try {
|
||||||
// close or abort the stream
|
// close or abort the stream
|
||||||
closeStream("close() operation", this.contentRangeFinish, false);
|
closeStream("close() operation", this.contentRangeFinish, false);
|
||||||
|
LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
|
||||||
// this is actually a no-op
|
// this is actually a no-op
|
||||||
super.close();
|
super.close();
|
||||||
} finally {
|
} finally {
|
||||||
@ -713,6 +731,8 @@ static long calculateRequestLimit(
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case Normal:
|
case Normal:
|
||||||
|
// normal is considered sequential until a backwards seek switches
|
||||||
|
// it to 'Random'
|
||||||
default:
|
default:
|
||||||
rangeLimit = contentLength;
|
rangeLimit = contentLength;
|
||||||
|
|
||||||
|
@ -667,6 +667,8 @@ public final class InputStreamStatistics implements AutoCloseable {
|
|||||||
public long readsIncomplete;
|
public long readsIncomplete;
|
||||||
public long bytesReadInClose;
|
public long bytesReadInClose;
|
||||||
public long bytesDiscardedInAbort;
|
public long bytesDiscardedInAbort;
|
||||||
|
public long policySetCount;
|
||||||
|
public long inputPolicy;
|
||||||
|
|
||||||
private InputStreamStatistics() {
|
private InputStreamStatistics() {
|
||||||
}
|
}
|
||||||
@ -782,6 +784,15 @@ public void close() {
|
|||||||
mergeInputStreamStatistics(this);
|
mergeInputStreamStatistics(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The input policy has been switched.
|
||||||
|
* @param updatedPolicy enum value of new policy.
|
||||||
|
*/
|
||||||
|
public void inputPolicySet(int updatedPolicy) {
|
||||||
|
policySetCount++;
|
||||||
|
inputPolicy = updatedPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* String operator describes all the current statistics.
|
* String operator describes all the current statistics.
|
||||||
* <b>Important: there are no guarantees as to the stability
|
* <b>Important: there are no guarantees as to the stability
|
||||||
@ -813,6 +824,8 @@ public String toString() {
|
|||||||
sb.append(", ReadsIncomplete=").append(readsIncomplete);
|
sb.append(", ReadsIncomplete=").append(readsIncomplete);
|
||||||
sb.append(", BytesReadInClose=").append(bytesReadInClose);
|
sb.append(", BytesReadInClose=").append(bytesReadInClose);
|
||||||
sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
|
sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
|
||||||
|
sb.append(", InputPolicy=").append(inputPolicy);
|
||||||
|
sb.append(", InputPolicySetCount=").append(policySetCount);
|
||||||
sb.append('}');
|
sb.append('}');
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
@ -1553,7 +1553,18 @@ backward seeks.
|
|||||||
|
|
||||||
*"normal" (default)*
|
*"normal" (default)*
|
||||||
|
|
||||||
This is currently the same as "sequential", though it may evolve in future.
|
The "Normal" policy starts off reading a file in "sequential" mode,
|
||||||
|
but if the caller seeks backwards in the stream, it switches from
|
||||||
|
sequential to "random".
|
||||||
|
|
||||||
|
This policy effectively recognizes the initial read pattern of columnar
|
||||||
|
storage formats (e.g. Apache ORC and Apache Parquet), which seek to the end
|
||||||
|
of a file, read in index data and then seek backwards to selectively read
|
||||||
|
columns. The first seeks may be be expensive compared to the random policy,
|
||||||
|
however the overall process is much less expensive than either sequentially
|
||||||
|
reading through a file with the "random" policy, or reading columnar data
|
||||||
|
with the "sequential" policy. When the exact format/recommended
|
||||||
|
seek policy of data are known in advance, this policy
|
||||||
|
|
||||||
*"random"*
|
*"random"*
|
||||||
|
|
||||||
|
@ -427,7 +427,11 @@ public void testRandomIONormalPolicy() throws Throwable {
|
|||||||
long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
|
long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
|
||||||
executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
|
executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
|
||||||
assertEquals("streams aborted in " + streamStatistics,
|
assertEquals("streams aborted in " + streamStatistics,
|
||||||
4, streamStatistics.aborted);
|
1, streamStatistics.aborted);
|
||||||
|
assertEquals("policy changes in " + streamStatistics,
|
||||||
|
2, streamStatistics.policySetCount);
|
||||||
|
assertEquals("input policy in " + streamStatistics,
|
||||||
|
S3AInputPolicy.Random.ordinal(), streamStatistics.inputPolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user