From 147a466c6d200db171554058ed93656f03b40334 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Thu, 28 Jul 2022 21:57:37 +0530 Subject: [PATCH] HADOOP-18227. Add input stream IOStats for vectored IO api in S3A. (#4636) part of HADOOP-18103. Contributed By: Mukund Thakur --- .../fs/statistics/StreamStatisticNames.java | 30 ++- .../AbstractContractVectoredReadTest.java | 11 ++ .../apache/hadoop/fs/s3a/S3AInputStream.java | 6 +- .../hadoop/fs/s3a/S3AInstrumentation.java | 29 +++ .../statistics/S3AInputStreamStatistics.java | 14 ++ .../impl/EmptyS3AStatisticsContext.java | 11 ++ .../s3a/ITestS3AContractVectoredRead.java | 171 ++++++++++++++++++ .../apache/hadoop/fs/s3a/S3ATestUtils.java | 31 ++++ .../scale/ITestS3AInputStreamPerformance.java | 2 + .../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 33 ---- 10 files changed, 303 insertions(+), 35 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index ca755f0841..bb697ad8cc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -47,7 +47,7 @@ public final class StreamStatisticNames { public static final String STREAM_READ_ABORTED = "stream_aborted"; /** - * Bytes read from an input stream in read() calls. + * Bytes read from an input stream in read()/readVectored() calls. * Does not include bytes read and then discarded in seek/close etc. * These are the bytes returned to the caller. * Value: {@value}. @@ -110,6 +110,34 @@ public final class StreamStatisticNames { public static final String STREAM_READ_OPERATIONS = "stream_read_operations"; + /** + * Count of readVectored() operations in an input stream. + * Value: {@value}. + */ + public static final String STREAM_READ_VECTORED_OPERATIONS = + "stream_read_vectored_operations"; + + /** + * Count of bytes discarded during readVectored() operation + * in an input stream. + * Value: {@value}. + */ + public static final String STREAM_READ_VECTORED_READ_BYTES_DISCARDED = + "stream_read_vectored_read_bytes_discarded"; + + /** + * Count of incoming file ranges during readVectored() operation. + * Value: {@value} + */ + public static final String STREAM_READ_VECTORED_INCOMING_RANGES = + "stream_read_vectored_incoming_ranges"; + /** + * Count of combined file ranges during readVectored() operation. + * Value: {@value} + */ + public static final String STREAM_READ_VECTORED_COMBINED_RANGES = + "stream_read_vectored_combined_ranges"; + /** * Count of incomplete read() operations in an input stream, * that is, when the bytes returned were less than that requested. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index 77bcc496ff..379b992fba 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -84,6 +84,10 @@ public IntFunction getAllocate() { return allocate; } + public WeakReferencedElasticByteBufferPool getPool() { + return pool; + } + @Override public void setup() throws Exception { super.setup(); @@ -382,6 +386,13 @@ protected List getSampleOverlappingRanges() { return fileRanges; } + protected List getConsecutiveRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(100, 500)); + fileRanges.add(FileRange.createFileRange(600, 500)); + return fileRanges; + } + /** * Validate that exceptions must be thrown during a vectored * read operation with specific input ranges. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 178a807733..c20c3a0486 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -963,7 +963,6 @@ public int maxReadSizeForVectorReads() { @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { - LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); if (stopVectoredIOOperations.getAndSet(false)) { @@ -978,6 +977,7 @@ public void readVectored(List ranges, if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { LOG.debug("Not merging the ranges as they are disjoint"); + streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size()); for (FileRange range: sortedRanges) { ByteBuffer buffer = allocate.apply(range.getLength()); unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); @@ -987,6 +987,7 @@ public void readVectored(List ranges, List combinedFileRanges = mergeSortedRanges(sortedRanges, 1, minSeekForVectorReads(), maxReadSizeForVectorReads()); + streamStatistics.readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size()); LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", ranges.size(), combinedFileRanges.size()); for (CombinedFileRange combinedFileRange: combinedFileRanges) { @@ -1088,6 +1089,7 @@ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQ } drainBytes += readCount; } + streamStatistics.readVectoredBytesDiscarded(drainBytes); LOG.debug("{} bytes drained from stream ", drainBytes); } @@ -1168,6 +1170,8 @@ private void populateBuffer(int length, } else { readByteArray(objectContent, buffer.array(), 0, length); } + // update io stats. + incrementBytesRead(length); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 7c40d2d13c..b57e030679 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -803,6 +803,10 @@ private final class InputStreamStatistics private final AtomicLong readOperations; private final AtomicLong readFullyOperations; private final AtomicLong seekOperations; + private final AtomicLong readVectoredOperations; + private final AtomicLong bytesDiscardedInVectoredIO; + private final AtomicLong readVectoredIncomingRanges; + private final AtomicLong readVectoredCombinedRanges; /** Bytes read by the application and any when draining streams . */ private final AtomicLong totalBytesRead; @@ -836,6 +840,10 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, StreamStatisticNames.STREAM_READ_TOTAL_BYTES, StreamStatisticNames.STREAM_READ_UNBUFFERED, + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES) .withGauges(STREAM_READ_GAUGE_INPUT_POLICY) .withDurationTracking(ACTION_HTTP_GET_REQUEST, @@ -872,6 +880,14 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE); readOperations = st.getCounterReference( StreamStatisticNames.STREAM_READ_OPERATIONS); + readVectoredOperations = st.getCounterReference( + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS); + bytesDiscardedInVectoredIO = st.getCounterReference( + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED); + readVectoredIncomingRanges = st.getCounterReference( + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES); + readVectoredCombinedRanges = st.getCounterReference( + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES); readFullyOperations = st.getCounterReference( StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS); seekOperations = st.getCounterReference( @@ -1017,6 +1033,19 @@ public void readOperationCompleted(int requested, int actual) { } } + @Override + public void readVectoredOperationStarted(int numIncomingRanges, + int numCombinedRanges) { + readVectoredIncomingRanges.addAndGet(numIncomingRanges); + readVectoredCombinedRanges.addAndGet(numCombinedRanges); + readVectoredOperations.incrementAndGet(); + } + + @Override + public void readVectoredBytesDiscarded(int discarded) { + bytesDiscardedInVectoredIO.addAndGet(discarded); + } + /** * {@code close()} merges the stream statistics into the filesystem's * instrumentation instance. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java index 539af2bde3..41a8f25315 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java @@ -96,6 +96,20 @@ void streamClose(boolean abortedConnection, */ void readOperationCompleted(int requested, int actual); + /** + * A vectored read operation has started.. + * @param numIncomingRanges number of input ranges. + * @param numCombinedRanges number of combined ranges. + */ + void readVectoredOperationStarted(int numIncomingRanges, + int numCombinedRanges); + + /** + * Number of bytes discarded during vectored read. + * @param discarded discarded bytes during vectored read. + */ + void readVectoredBytesDiscarded(int discarded); + @Override void close(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index 5c0995e41b..cea8be7f10 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -195,6 +195,17 @@ public void readOperationCompleted(final int requested, final int actual) { } + @Override + public void readVectoredOperationStarted(int numIncomingRanges, + int numCombinedRanges) { + + } + + @Override + public void readVectoredBytesDiscarded(int discarded) { + + } + @Override public void close() { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 18a727dcdc..84a90ba441 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -19,28 +19,41 @@ package org.apache.hadoop.fs.contract.s3a; import java.io.EOFException; +import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.test.MoreAsserts.assertEqual; public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { + private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractVectoredRead.class); + public ITestS3AContractVectoredRead(String bufferType) { super(bufferType); } @@ -156,4 +169,162 @@ public void testSameRanges() throws Exception { List fileRanges = getSampleSameRanges(); verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); } + + /** + * As the minimum seek value is 4*1024, the first three ranges will be + * merged into and other two will remain as it is. + * */ + @Test + public void testNormalReadVsVectoredReadStatsCollection() throws Exception { + FileSystem fs = getTestFileSystemWithReadAheadDisabled(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); + + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, getAllocate()); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, getPool()); + + // audit the io statistics for this stream + IOStatistics st = in.getIOStatistics(); + LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st)); + + // the vectored io operation must be tracked + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 1); + + // the vectored io operation is being called with 5 input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, + 5); + + // 5 input ranges got combined in 3 as some of them are close. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, + 3); + + // number of bytes discarded will be based on the above input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + 5944); + + verifyStatisticCounterValue(st, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + 3); + + // read bytes should match the sum of requested length for each input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_BYTES, + 1424); + + } + + CompletableFuture builder1 = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + + try (FSDataInputStream in = builder1.get()) { + for (FileRange range : fileRanges) { + byte[] temp = new byte[range.getLength()]; + in.readFully((int) range.getOffset(), temp, 0, range.getLength()); + } + + // audit the statistics for this stream + IOStatistics st = in.getIOStatistics(); + LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st)); + + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 0); + + // all other counter values consistent. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + 0); + verifyStatisticCounterValue(st, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + 5); + + // read bytes should match the sum of requested length for each input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_BYTES, + 1424); + } + } + + @Test + public void testMultiVectoredReadStatsCollection() throws Exception { + FileSystem fs = getTestFileSystemWithReadAheadDisabled(); + List ranges1 = getConsecutiveRanges(); + List ranges2 = getConsecutiveRanges(); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(ranges1, getAllocate()); + in.readVectored(ranges2, getAllocate()); + validateVectoredReadResult(ranges1, DATASET); + validateVectoredReadResult(ranges2, DATASET); + returnBuffersToPoolPostRead(ranges1, getPool()); + returnBuffersToPoolPostRead(ranges2, getPool()); + + // audit the io statistics for this stream + IOStatistics st = in.getIOStatistics(); + + // 2 vectored io calls are made above. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 2); + + // 2 vectored io operation is being called with 2 input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, + 4); + + // 2 ranges are getting merged in 1 during both vectored io operation. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, + 2); + + // number of bytes discarded will be 0 as the ranges are consecutive. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + 0); + // only 2 http get request will be made because ranges in both range list will be merged + // to 1 because they are consecutive. + verifyStatisticCounterValue(st, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + 2); + // read bytes should match the sum of requested length for each input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_BYTES, + 2000); + } + } + + private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException { + Configuration conf = getFileSystem().getConf(); + // also resetting the min seek and max size values is important + // as this same test suite has test which overrides these params. + S3ATestUtils.removeBaseAndBucketOverrides(conf, + Constants.READAHEAD_RANGE, + Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, + Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE); + S3ATestUtils.disableFilesystemCaching(conf); + conf.setInt(Constants.READAHEAD_RANGE, 0); + return S3ATestUtils.createTestFileSystem(conf); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 48cb52c5ac..6162ed1312 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.io.DataInputBuffer; @@ -69,6 +70,7 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.text.DateFormat; @@ -1457,4 +1459,33 @@ public static void skipIfEncryptionNotSet(Configuration configuration, + " in " + secrets); } } + + + /** + * Get the input stream statistics of an input stream. + * Raises an exception if the inner stream is not an S3A input stream + * @param in wrapper + * @return the statistics for the inner stream + */ + public static S3AInputStreamStatistics getInputStreamStatistics( + FSDataInputStream in) { + return getS3AInputStream(in).getS3AStreamStatistics(); + } + + /** + * Get the inner stream of an input stream. + * Raises an exception if the inner stream is not an S3A input stream + * @param in wrapper + * @return the inner stream + * @throws AssertionError if the inner stream is of the wrong type + */ + public static S3AInputStream getS3AInputStream( + FSDataInputStream in) { + InputStream inner = in.getWrappedStream(); + if (inner instanceof S3AInputStream) { + return (S3AInputStream) inner; + } else { + throw new AssertionError("Not an S3AInputStream: " + inner); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java index d73a938bcc..b8195cb996 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java @@ -56,6 +56,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getInputStreamStatistics; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMinimum; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java index d95b46b10d..514c6cf886 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java @@ -19,19 +19,14 @@ package org.apache.hadoop.fs.s3a.scale; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3ATestConstants; import org.apache.hadoop.fs.s3a.Statistic; -import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.InputStream; - import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic; @@ -154,34 +149,6 @@ protected int getTestTimeoutMillis() { return getTestTimeoutSeconds() * 1000; } - /** - * Get the input stream statistics of an input stream. - * Raises an exception if the inner stream is not an S3A input stream - * @param in wrapper - * @return the statistics for the inner stream - */ - protected S3AInputStreamStatistics getInputStreamStatistics( - FSDataInputStream in) { - return getS3AInputStream(in).getS3AStreamStatistics(); - } - - /** - * Get the inner stream of an input stream. - * Raises an exception if the inner stream is not an S3A input stream - * @param in wrapper - * @return the inner stream - * @throws AssertionError if the inner stream is of the wrong type - */ - protected S3AInputStream getS3AInputStream( - FSDataInputStream in) { - InputStream inner = in.getWrappedStream(); - if (inner instanceof S3AInputStream) { - return (S3AInputStream) inner; - } else { - throw new AssertionError("Not an S3AInputStream: " + inner); - } - } - /** * Get the gauge value of a statistic from the * IOStatistics of the filesystem. Raises an assertion if