From 1ba491ff907fc5d2618add980734a3534e2be098 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 20 Dec 2017 18:25:33 +0000 Subject: [PATCH] HADOOP-14965. S3a input stream "normal" fadvise mode to be adaptive --- .../apache/hadoop/fs/s3a/S3AInputStream.java | 28 ++++++++++++++++--- .../hadoop/fs/s3a/S3AInstrumentation.java | 13 +++++++++ .../site/markdown/tools/hadoop-aws/index.md | 13 ++++++++- .../scale/ITestS3AInputStreamPerformance.java | 6 +++- 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 7e6d640f7b..00741437a7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -83,7 +83,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { private final S3AInstrumentation.InputStreamStatistics streamStatistics; private S3AEncryptionMethods serverSideEncryptionAlgorithm; private String serverSideEncryptionKey; - private final S3AInputPolicy inputPolicy; + private S3AInputPolicy inputPolicy; private long readahead = Constants.DEFAULT_READAHEAD_RANGE; private final Invoker invoker; @@ -139,11 +139,21 @@ public S3AInputStream(S3ObjectAttributes s3Attributes, this.serverSideEncryptionAlgorithm = s3Attributes.getServerSideEncryptionAlgorithm(); this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey(); - this.inputPolicy = inputPolicy; + setInputPolicy(inputPolicy); setReadahead(readahead); this.invoker = invoker; } + /** + * Set/update the input policy of the stream. + * This updates the stream statistics. + * @param inputPolicy new input policy. + */ + private void setInputPolicy(S3AInputPolicy inputPolicy) { + this.inputPolicy = inputPolicy; + streamStatistics.inputPolicySet(inputPolicy.ordinal()); + } + /** * Opens up the stream at specified target position and for given length. * @@ -162,8 +172,9 @@ private synchronized void reopen(String reason, long targetPos, long length) contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos, length, contentLength, readahead); LOG.debug("reopen({}) for {} range[{}-{}], length={}," + - " streamPosition={}, nextReadPosition={}", - uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos); + " streamPosition={}, nextReadPosition={}, policy={}", + uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos, + inputPolicy); long opencount = streamStatistics.streamOpened(); GetObjectRequest request = new GetObjectRequest(bucket, key) @@ -274,6 +285,12 @@ private void seekInStream(long targetPos, long length) throws IOException { } else if (diff < 0) { // backwards seek streamStatistics.seekBackwards(diff); + // if the stream is in "Normal" mode, switch to random IO at this + // point, as it is indicative of columnar format IO + if (inputPolicy.equals(S3AInputPolicy.Normal)) { + LOG.info("Switching to Random IO seek policy"); + setInputPolicy(S3AInputPolicy.Random); + } } else { // targetPos == pos if (remainingInCurrentRequest() > 0) { @@ -443,6 +460,7 @@ public synchronized void close() throws IOException { try { // close or abort the stream closeStream("close() operation", this.contentRangeFinish, false); + LOG.debug("Statistics of stream {}\n{}", key, streamStatistics); // this is actually a no-op super.close(); } finally { @@ -713,6 +731,8 @@ static long calculateRequestLimit( break; case Normal: + // normal is considered sequential until a backwards seek switches + // it to 'Random' default: rangeLimit = contentLength; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 0fbcc00210..d843347475 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -667,6 +667,8 @@ public final class InputStreamStatistics implements AutoCloseable { public long readsIncomplete; public long bytesReadInClose; public long bytesDiscardedInAbort; + public long policySetCount; + public long inputPolicy; private InputStreamStatistics() { } @@ -782,6 +784,15 @@ public void close() { mergeInputStreamStatistics(this); } + /** + * The input policy has been switched. + * @param updatedPolicy enum value of new policy. + */ + public void inputPolicySet(int updatedPolicy) { + policySetCount++; + inputPolicy = updatedPolicy; + } + /** * String operator describes all the current statistics. * Important: there are no guarantees as to the stability @@ -813,6 +824,8 @@ public String toString() { sb.append(", ReadsIncomplete=").append(readsIncomplete); sb.append(", BytesReadInClose=").append(bytesReadInClose); sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort); + sb.append(", InputPolicy=").append(inputPolicy); + sb.append(", InputPolicySetCount=").append(policySetCount); sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index fbcd54abab..7eebf5cecf 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1553,7 +1553,18 @@ backward seeks. *"normal" (default)* -This is currently the same as "sequential", though it may evolve in future. +The "Normal" policy starts off reading a file in "sequential" mode, +but if the caller seeks backwards in the stream, it switches from +sequential to "random". + +This policy effectively recognizes the initial read pattern of columnar +storage formats (e.g. Apache ORC and Apache Parquet), which seek to the end +of a file, read in index data and then seek backwards to selectively read +columns. The first seeks may be be expensive compared to the random policy, +however the overall process is much less expensive than either sequentially +reading through a file with the "random" policy, or reading columnar data +with the "sequential" policy. When the exact format/recommended +seek policy of data are known in advance, this policy *"random"* diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java index 83ab2102bf..efd96c4e73 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java @@ -427,7 +427,11 @@ public void testRandomIONormalPolicy() throws Throwable { long expectedOpenCount = RANDOM_IO_SEQUENCE.length; executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount); assertEquals("streams aborted in " + streamStatistics, - 4, streamStatistics.aborted); + 1, streamStatistics.aborted); + assertEquals("policy changes in " + streamStatistics, + 2, streamStatistics.policySetCount); + assertEquals("input policy in " + streamStatistics, + S3AInputPolicy.Random.ordinal(), streamStatistics.inputPolicy); } /**