diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java index 1141603265..d58a695ec2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java @@ -216,10 +216,15 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf, } writeFile(blockFilePath, buffer); - prefetchingStatistics.blockAddedToFileCache(); long checksum = BufferData.getChecksum(buffer); Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum); blocks.put(blockNumber, entry); + // Update stream_read_blocks_in_cache stats only after blocks map is updated with new file + // entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache. + // If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of + // the input stream can lead to the removal of the cache file even before blocks is added with + // the new cache file, leading to incorrect value of stream_read_blocks_in_cache. + prefetchingStatistics.blockAddedToFileCache(); } private static final Set CREATE_OPTIONS = @@ -268,7 +273,7 @@ public void close() throws IOException { prefetchingStatistics.blockRemovedFromFileCache(); numFilesDeleted++; } catch (IOException e) { - // Ignore while closing so that we can delete as many cache files as possible. + LOG.debug("Failed to delete cache file {}", entry.path, e); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index b1ab4afce4..2146e17b6a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.test.LambdaTestUtils; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; @@ -74,6 +75,9 @@ public ITestS3APrefetchingInputStream() { // Size should be < block size so S3AInMemoryInputStream is used private static final int SMALL_FILE_SIZE = S_1K * 16; + private static final int TIMEOUT_MILLIS = 5000; + private static final int INTERVAL_MILLIS = 500; + @Override public Configuration createConfiguration() { @@ -202,13 +206,19 @@ public void testRandomReadLargeFile() throws Throwable { // Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch) // Blocks 0, 1, 3 were not fully read, so remain in the file cache - verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4); - verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4); - verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2); - verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3); + LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> { + LOG.info("IO stats: {}", ioStats); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4); + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3); + }); } - verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); - verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> { + LOG.info("IO stats: {}", ioStats); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + }); } @Test