diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 6e7a2511f6..60221263cf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -823,9 +823,17 @@ public static long validateReadahead(@Nullable Long readahead) { } } + /** + * Closes the underlying S3 stream, and merges the {@link #streamStatistics} + * instance associated with the stream. + */ @Override public synchronized void unbuffer() { - closeStream("unbuffer()", contentRangeFinish, false); + try { + closeStream("unbuffer()", contentRangeFinish, false); + } finally { + streamStatistics.merge(false); + } } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 2bb8f682d8..fd7893f1bc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -647,6 +647,7 @@ public final class InputStreamStatistics implements AutoCloseable { public long inputPolicy; /** This is atomic so that it can be passed as a reference. */ private final AtomicLong versionMismatches = new AtomicLong(0); + private InputStreamStatistics mergedStats; private InputStreamStatistics() { } @@ -759,7 +760,7 @@ public void readOperationCompleted(int requested, int actual) { */ @Override public void close() { - mergeInputStreamStatistics(this); + merge(true); } /** @@ -816,6 +817,88 @@ public String toString() { sb.append('}'); return sb.toString(); } + + /** + * Merge the statistics into the filesystem's instrumentation instance. + * Takes a diff between the current version of the stats and the + * version of the stats when merge was last called, and merges the diff + * into the instrumentation instance. Used to periodically merge the + * stats into the fs-wide stats. Behavior is undefined if called on a + * closed instance. + */ + void merge(boolean isClosed) { + if (mergedStats != null) { + mergeInputStreamStatistics(diff(mergedStats)); + } else { + mergeInputStreamStatistics(this); + } + // If stats are closed, no need to create another copy + if (!isClosed) { + mergedStats = copy(); + } + } + + /** + * Returns a diff between this {@link InputStreamStatistics} instance and + * the given {@link InputStreamStatistics} instance. + */ + private InputStreamStatistics diff(InputStreamStatistics inputStats) { + InputStreamStatistics diff = new InputStreamStatistics(); + diff.openOperations = openOperations - inputStats.openOperations; + diff.closeOperations = closeOperations - inputStats.closeOperations; + diff.closed = closed - inputStats.closed; + diff.aborted = aborted - inputStats.aborted; + diff.seekOperations = seekOperations - inputStats.seekOperations; + diff.readExceptions = readExceptions - inputStats.readExceptions; + diff.forwardSeekOperations = + forwardSeekOperations - inputStats.forwardSeekOperations; + diff.backwardSeekOperations = + backwardSeekOperations - inputStats.backwardSeekOperations; + diff.bytesRead = bytesRead - inputStats.bytesRead; + diff.bytesSkippedOnSeek = + bytesSkippedOnSeek - inputStats.bytesSkippedOnSeek; + diff.bytesBackwardsOnSeek = + bytesBackwardsOnSeek - inputStats.bytesBackwardsOnSeek; + diff.readOperations = readOperations - inputStats.readOperations; + diff.readFullyOperations = + readFullyOperations - inputStats.readFullyOperations; + diff.readsIncomplete = readsIncomplete - inputStats.readsIncomplete; + diff.bytesReadInClose = bytesReadInClose - inputStats.bytesReadInClose; + diff.bytesDiscardedInAbort = + bytesDiscardedInAbort - inputStats.bytesDiscardedInAbort; + diff.policySetCount = policySetCount - inputStats.policySetCount; + diff.inputPolicy = inputPolicy - inputStats.inputPolicy; + diff.versionMismatches.set(versionMismatches.longValue() - + inputStats.versionMismatches.longValue()); + return diff; + } + + /** + * Returns a new {@link InputStreamStatistics} instance with all the same + * values as this {@link InputStreamStatistics}. + */ + private InputStreamStatistics copy() { + InputStreamStatistics copy = new InputStreamStatistics(); + copy.openOperations = openOperations; + copy.closeOperations = closeOperations; + copy.closed = closed; + copy.aborted = aborted; + copy.seekOperations = seekOperations; + copy.readExceptions = readExceptions; + copy.forwardSeekOperations = forwardSeekOperations; + copy.backwardSeekOperations = backwardSeekOperations; + copy.bytesRead = bytesRead; + copy.bytesSkippedOnSeek = bytesSkippedOnSeek; + copy.bytesBackwardsOnSeek = bytesBackwardsOnSeek; + copy.readOperations = readOperations; + copy.readFullyOperations = readFullyOperations; + copy.readsIncomplete = readsIncomplete; + copy.bytesReadInClose = bytesReadInClose; + copy.bytesDiscardedInAbort = bytesDiscardedInAbort; + copy.policySetCount = policySetCount; + copy.inputPolicy = inputPolicy; + return copy; + } } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java index b04b9da486..2ba3fd7a65 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java @@ -19,14 +19,16 @@ package org.apache.hadoop.fs.s3a; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.IOUtils; import org.junit.Test; import java.io.IOException; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_SEEK_BYTES_READ; + /** * Integration test for calling * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} on {@link S3AInputStream}. @@ -38,20 +40,27 @@ */ public class ITestS3AUnbuffer extends AbstractS3ATestBase { + private Path dest; + + @Override + public void setup() throws Exception { + super.setup(); + dest = path("ITestS3AUnbuffer"); + describe("ITestS3AUnbuffer"); + + byte[] data = ContractTestUtils.dataset(16, 'a', 26); + ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length, + 16, true); + } + @Test public void testUnbuffer() throws IOException { - // Setup test file - Path dest = path("testUnbuffer"); describe("testUnbuffer"); - try (FSDataOutputStream outputStream = getFileSystem().create(dest, true)) { - byte[] data = ContractTestUtils.dataset(16, 'a', 26); - outputStream.write(data); - } // Open file, read half the data, and then call unbuffer try (FSDataInputStream inputStream = getFileSystem().open(dest)) { assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream); - assertEquals(8, inputStream.read(new byte[8])); + readAndAssertBytesRead(inputStream, 8); assertTrue(isObjectStreamOpen(inputStream)); inputStream.unbuffer(); @@ -60,7 +69,61 @@ public void testUnbuffer() throws IOException { } } + /** + * Test that calling {@link S3AInputStream#unbuffer()} merges a stream's + * {@link org.apache.hadoop.fs.s3a.S3AInstrumentation.InputStreamStatistics} + * into the {@link S3AFileSystem}'s {@link S3AInstrumentation} instance. + */ + @Test + public void testUnbufferStreamStatistics() throws IOException { + describe("testUnbufferStreamStatistics"); + + // Validate bytesRead is updated correctly + S3ATestUtils.MetricDiff bytesRead = new S3ATestUtils.MetricDiff( + getFileSystem(), STREAM_SEEK_BYTES_READ); + + // Open file, read half the data, and then call unbuffer + FSDataInputStream inputStream = null; + try { + inputStream = getFileSystem().open(dest); + + readAndAssertBytesRead(inputStream, 8); + inputStream.unbuffer(); + + // Validate that calling unbuffer updates the input stream statistics + bytesRead.assertDiffEquals(8); + + // Validate that calling unbuffer twice in a row updates the statistics + // correctly + readAndAssertBytesRead(inputStream, 4); + inputStream.unbuffer(); + bytesRead.assertDiffEquals(12); + } finally { + IOUtils.closeStream(inputStream); + } + + // Validate that closing the file does not further change the statistics + bytesRead.assertDiffEquals(12); + + // Validate that the input stream stats are correct when the file is closed + assertEquals("S3AInputStream statistics were not updated properly", 12, + ((S3AInputStream) inputStream.getWrappedStream()) + .getS3AStreamStatistics().bytesRead); + } + private boolean isObjectStreamOpen(FSDataInputStream inputStream) { return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen(); } + + /** + * Read the specified number of bytes from the given + * {@link FSDataInputStream} and assert that + * {@link FSDataInputStream#read(byte[])} read the specified number of bytes. + */ + private static void readAndAssertBytesRead(FSDataInputStream inputStream, + int bytesToRead) throws IOException { + assertEquals("S3AInputStream#read did not read the correct number of " + + "bytes", bytesToRead, + inputStream.read(new byte[bytesToRead])); + } }