HADOOP-18697. S3A prefetch: failure of ITestS3APrefetchingInputStream#testRandomReadLargeFile (#5580)
Contributed by Viraj Jasani
This commit is contained in:
parent
73ca64a3ba
commit
bfcf5dd03b
@ -216,10 +216,15 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
|
|||||||
}
|
}
|
||||||
|
|
||||||
writeFile(blockFilePath, buffer);
|
writeFile(blockFilePath, buffer);
|
||||||
prefetchingStatistics.blockAddedToFileCache();
|
|
||||||
long checksum = BufferData.getChecksum(buffer);
|
long checksum = BufferData.getChecksum(buffer);
|
||||||
Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum);
|
Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum);
|
||||||
blocks.put(blockNumber, entry);
|
blocks.put(blockNumber, entry);
|
||||||
|
// Update stream_read_blocks_in_cache stats only after blocks map is updated with new file
|
||||||
|
// entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache.
|
||||||
|
// If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of
|
||||||
|
// the input stream can lead to the removal of the cache file even before blocks is added with
|
||||||
|
// the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
|
||||||
|
prefetchingStatistics.blockAddedToFileCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Set<? extends OpenOption> CREATE_OPTIONS =
|
private static final Set<? extends OpenOption> CREATE_OPTIONS =
|
||||||
@ -268,7 +273,7 @@ public void close() throws IOException {
|
|||||||
prefetchingStatistics.blockRemovedFromFileCache();
|
prefetchingStatistics.blockRemovedFromFileCache();
|
||||||
numFilesDeleted++;
|
numFilesDeleted++;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// Ignore while closing so that we can delete as many cache files as possible.
|
LOG.debug("Failed to delete cache file {}", entry.path, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
|
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
|
||||||
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
||||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
|
||||||
@ -74,6 +75,9 @@ public ITestS3APrefetchingInputStream() {
|
|||||||
// Size should be < block size so S3AInMemoryInputStream is used
|
// Size should be < block size so S3AInMemoryInputStream is used
|
||||||
private static final int SMALL_FILE_SIZE = S_1K * 16;
|
private static final int SMALL_FILE_SIZE = S_1K * 16;
|
||||||
|
|
||||||
|
private static final int TIMEOUT_MILLIS = 5000;
|
||||||
|
private static final int INTERVAL_MILLIS = 500;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Configuration createConfiguration() {
|
public Configuration createConfiguration() {
|
||||||
@ -202,13 +206,19 @@ public void testRandomReadLargeFile() throws Throwable {
|
|||||||
|
|
||||||
// Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch)
|
// Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch)
|
||||||
// Blocks 0, 1, 3 were not fully read, so remain in the file cache
|
// Blocks 0, 1, 3 were not fully read, so remain in the file cache
|
||||||
|
LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
|
||||||
|
LOG.info("IO stats: {}", ioStats);
|
||||||
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4);
|
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4);
|
||||||
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4);
|
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4);
|
||||||
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2);
|
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2);
|
||||||
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3);
|
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
|
||||||
|
LOG.info("IO stats: {}", ioStats);
|
||||||
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
|
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
|
||||||
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user