HADOOP-18697. S3A prefetch: failure of ITestS3APrefetchingInputStream#testRandomReadLargeFile (#5580)

Contributed by Viraj Jasani
This commit is contained in:
Viraj Jasani 2023-05-02 04:21:46 -10:00 committed by Steve Loughran
parent a226016c52
commit 0ad7d7c677
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
2 changed files with 23 additions and 8 deletions

View File

@ -216,10 +216,15 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
} }
writeFile(blockFilePath, buffer); writeFile(blockFilePath, buffer);
prefetchingStatistics.blockAddedToFileCache();
long checksum = BufferData.getChecksum(buffer); long checksum = BufferData.getChecksum(buffer);
Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum); Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum);
blocks.put(blockNumber, entry); blocks.put(blockNumber, entry);
// Update stream_read_blocks_in_cache stats only after blocks map is updated with new file
// entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of
// the input stream can lead to the removal of the cache file even before blocks is added with
// the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
} }
private static final Set<? extends OpenOption> CREATE_OPTIONS = private static final Set<? extends OpenOption> CREATE_OPTIONS =
@ -268,7 +273,7 @@ public void close() throws IOException {
prefetchingStatistics.blockRemovedFromFileCache(); prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++; numFilesDeleted++;
} catch (IOException e) { } catch (IOException e) {
// Ignore while closing so that we can delete as many cache files as possible. LOG.debug("Failed to delete cache file {}", entry.path, e);
} }
} }

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
@ -74,6 +75,9 @@ public ITestS3APrefetchingInputStream() {
// Size should be < block size so S3AInMemoryInputStream is used // Size should be < block size so S3AInMemoryInputStream is used
private static final int SMALL_FILE_SIZE = S_1K * 16; private static final int SMALL_FILE_SIZE = S_1K * 16;
private static final int TIMEOUT_MILLIS = 5000;
private static final int INTERVAL_MILLIS = 500;
@Override @Override
public Configuration createConfiguration() { public Configuration createConfiguration() {
@ -202,13 +206,19 @@ public void testRandomReadLargeFile() throws Throwable {
// Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch) // Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch)
// Blocks 0, 1, 3 were not fully read, so remain in the file cache // Blocks 0, 1, 3 were not fully read, so remain in the file cache
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4); LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4); LOG.info("IO stats: {}", ioStats);
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2); verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4);
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3); verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4);
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2);
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3);
});
} }
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); LOG.info("IO stats: {}", ioStats);
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
});
} }
@Test @Test