diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java index d2ae9d7dc3..70dc5ee476 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -42,6 +42,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; @@ -67,6 +68,7 @@ public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase { @Override protected Configuration createConfiguration() { Configuration configuration = super.createConfiguration(); + disablePrefetching(configuration); enableIOStatisticsContext(); return configuration; } @@ -253,6 +255,7 @@ public void testThreadIOStatisticsForDifferentThreads() // Worker thread work and wait for it to finish. TestWorkerThread workerThread = new TestWorkerThread(path, null); long workerThreadID = workerThread.getId(); + LOG.info("Worker thread ID: {} ", workerThreadID); workerThread.start(); workerThread.join(); @@ -463,6 +466,8 @@ private class TestWorkerThread extends Thread implements Runnable { @Override public void run() { + // Setting the worker thread's name. + Thread.currentThread().setName("worker thread"); S3AFileSystem fs = getFileSystem(); byte[] data = new byte[BYTES_SMALL]; @@ -470,6 +475,9 @@ public void run() { if (ioStatisticsContext != null) { IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext); } + // Storing context in a field to not lose the reference in a GC. + IOStatisticsContext ioStatisticsContextWorkerThread = + getCurrentIOStatisticsContext(); // Write in the worker thread. try (FSDataOutputStream out = fs.create(workerThreadPath)) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 6162ed1312..260c5fc504 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -1488,4 +1488,13 @@ public static S3AInputStream getS3AInputStream( throw new AssertionError("Not an S3AInputStream: " + inner); } } + + /** + * Disable Prefetching streams from S3AFileSystem in tests. + * @param conf Configuration to remove the prefetch property from. + */ + public static void disablePrefetching(Configuration conf) { + // TODO: Reinstate once HADOOP-18028 is merged. + // removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); + } }