HADOOP-18829. S3A prefetch LRU cache eviction metrics (#5893)

Contributed by: Viraj Jasani
This commit is contained in:
Viraj Jasani 2023-09-21 01:01:44 -08:00 committed by GitHub
parent 42b8e6faa7
commit 27cb551821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 145 additions and 29 deletions

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import static java.util.Objects.requireNonNull;
@ -111,8 +112,10 @@ public abstract class CachingBlockManager extends BlockManager {
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @param maxBlocksCount max blocks count to be kept in cache at any time.
* @param trackerFactory tracker with statistics to update.
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/
@SuppressWarnings("checkstyle:parameternumber")
public CachingBlockManager(
ExecutorServiceFuturePool futurePool,
BlockData blockData,
@ -120,7 +123,8 @@ public CachingBlockManager(
PrefetchingStatistics prefetchingStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator,
int maxBlocksCount) {
int maxBlocksCount,
DurationTrackerFactory trackerFactory) {
super(blockData);
Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@ -136,7 +140,7 @@ public CachingBlockManager(
if (this.getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
this.prefetchingStatistics);
this.cache = this.createCache(maxBlocksCount);
this.cache = this.createCache(maxBlocksCount, trackerFactory);
}
this.ops = new BlockOperations();
@ -559,8 +563,8 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
}
}
protected BlockCache createCache(int maxBlocksCount) {
return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount);
protected BlockCache createCache(int maxBlocksCount, DurationTrackerFactory trackerFactory) {
return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount, trackerFactory);
}
protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {

View File

@ -57,6 +57,11 @@ public void blockRemovedFromFileCache() {
}
@Override
public void blockEvictedFromFileCache() {
}
@Override
public void prefetchOperationCompleted() {

View File

@ -42,6 +42,11 @@ public interface PrefetchingStatistics extends IOStatisticsSource {
*/
void blockRemovedFromFileCache();
/**
* A block has been evicted from the file cache.
*/
void blockEvictedFromFileCache();
/**
* A prefetch operation has completed.
*/

View File

@ -47,10 +47,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.util.Preconditions;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_FILE_CACHE_EVICTION;
/**
* Provides functionality necessary for caching blocks of data read from FileSystem.
@ -99,6 +103,11 @@ public class SingleFilePerBlockCache implements BlockCache {
private final PrefetchingStatistics prefetchingStatistics;
/**
* Duration tracker factory required to track the duration of some operations.
*/
private final DurationTrackerFactory trackerFactory;
/**
* File attributes attached to any intermediate temporary file created during index creation.
*/
@ -209,14 +218,19 @@ private void setNext(Entry next) {
*
* @param prefetchingStatistics statistics for this stream.
* @param maxBlocksCount max blocks count to be kept in cache at any time.
* @param trackerFactory tracker with statistics to update
*/
public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, int maxBlocksCount) {
public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics,
int maxBlocksCount,
DurationTrackerFactory trackerFactory) {
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.closed = new AtomicBoolean(false);
this.maxBlocksCount = maxBlocksCount;
Preconditions.checkArgument(maxBlocksCount > 0, "maxBlocksCount should be more than 0");
blocks = new ConcurrentHashMap<>();
blocksLock = new ReentrantReadWriteLock();
this.trackerFactory = trackerFactory != null
? trackerFactory : stubDurationTrackerFactory();
}
/**
@ -430,25 +444,28 @@ private void addToLinkedListAndEvictIfRequired(Entry entry) {
* @param elementToPurge Block entry to evict.
*/
private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", elementToPurge.path,
try (DurationTracker ignored = trackerFactory.trackDuration(STREAM_FILE_CACHE_EVICTION)) {
boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
} else {
try {
if (Files.deleteIfExists(elementToPurge.path)) {
entryListSize--;
prefetchingStatistics.blockRemovedFromFileCache();
blocks.remove(elementToPurge.blockNumber);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", elementToPurge.path,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
} else {
try {
if (Files.deleteIfExists(elementToPurge.path)) {
entryListSize--;
prefetchingStatistics.blockRemovedFromFileCache();
blocks.remove(elementToPurge.blockNumber);
prefetchingStatistics.blockEvictedFromFileCache();
}
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
} finally {
elementToPurge.releaseLock(Entry.LockType.WRITE);
}
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
} finally {
elementToPurge.releaseLock(Entry.LockType.WRITE);
}
}
}

View File

@ -455,6 +455,18 @@ public final class StreamStatisticNames {
public static final String STREAM_READ_BLOCK_ACQUIRE_AND_READ
= "stream_read_block_acquire_read";
/**
* Total number of blocks evicted from the disk cache.
*/
public static final String STREAM_EVICT_BLOCKS_FROM_FILE_CACHE
= "stream_evict_blocks_from_cache";
/**
* Track duration of LRU cache eviction for disk cache.
*/
public static final String STREAM_FILE_CACHE_EVICTION
= "stream_file_cache_eviction";
private StreamStatisticNames() {
}

View File

@ -45,7 +45,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
public void testArgChecks() throws Exception {
// Should not throw.
BlockCache cache =
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2);
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2, null);
ByteBuffer buffer = ByteBuffer.allocate(16);
@ -55,7 +55,7 @@ public void testArgChecks() throws Exception {
intercept(NullPointerException.class, null,
() -> new SingleFilePerBlockCache(null, 2));
() -> new SingleFilePerBlockCache(null, 2, null));
}
@ -63,7 +63,7 @@ public void testArgChecks() throws Exception {
@Test
public void testPutAndGet() throws Exception {
BlockCache cache =
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2);
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2, null);
ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
for (byte i = 0; i < BUFFER_SIZE; i++) {

View File

@ -176,6 +176,23 @@ public static long verifyStatisticCounterValue(
verifyStatisticsNotNull(stats).counters(), value);
}
/**
* Assert that two counters have similar values.
*
* @param stats statistics source.
* @param key1 statistic first key.
* @param key2 statistic second key.
*/
public static void verifyStatisticCounterValues(
final IOStatistics stats,
final String key1,
final String key2) {
verifyStatisticValues(COUNTER,
key1,
key2,
verifyStatisticsNotNull(stats).counters());
}
/**
* Assert that a gauge has an expected value.
* @param stats statistics source
@ -258,6 +275,26 @@ private static <E> E verifyStatisticValue(
return statistic;
}
/**
* Assert that the given two statistics have same values.
*
* @param type type of the statistics.
* @param key1 statistic first key.
* @param key2 statistic second key.
* @param map map to look up.
* @param <E> type of map element.
*/
private static <E> void verifyStatisticValues(
final String type,
final String key1,
final String key2,
final Map<String, E> map) {
final E statistic1 = lookupStatistic(type, key1, map);
final E statistic2 = lookupStatistic(type, key2, map);
assertThat(statistic1)
.describedAs("%s named %s and %s named %s", type, key1, type, key2)
.isEqualTo(statistic2);
}
/**
* Assert that a given statistic has an expected value.

View File

@ -886,7 +886,8 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE)
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@ -899,7 +900,8 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ,
StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ)
StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ,
StreamStatisticNames.STREAM_FILE_CACHE_EVICTION)
.build();
setIOStatistics(st);
aborted = st.getCounterReference(
@ -1395,6 +1397,11 @@ public void blockRemovedFromFileCache() {
incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, -1);
}
@Override
public void blockEvictedFromFileCache() {
increment(StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE);
}
@Override
public void prefetchOperationCompleted() {
incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);

View File

@ -447,6 +447,16 @@ public enum Statistic {
"Total queue duration of all block uploads",
TYPE_DURATION),
/* Stream prefetch file cache eviction */
STREAM_EVICT_BLOCKS_FROM_FILE_CACHE(
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
"Count of blocks evicted from the disk cache",
TYPE_COUNTER),
STREAM_FILE_CACHE_EVICTION(
StreamStatisticNames.STREAM_FILE_CACHE_EVICTION,
"Duration of the eviction of an element from LRU cache that holds disk cache blocks",
TYPE_DURATION),
/* committer stats */
COMMITTER_COMMITS_CREATED(
"committer_commits_created",

View File

@ -76,7 +76,8 @@ public S3ACachingBlockManager(
streamStatistics,
conf,
localDirAllocator,
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT),
streamStatistics);
Validate.checkNotNull(reader, "reader");

View File

@ -241,6 +241,11 @@ public void blockRemovedFromFileCache() {
}
@Override
public void blockEvictedFromFileCache() {
}
@Override
public void executorAcquired(Duration timeInQueue) {

View File

@ -45,7 +45,11 @@
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValues;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_FILE_CACHE_EVICTION;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
/**
@ -172,6 +176,14 @@ public void testSeeksWithLruEviction() throws Throwable {
LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
LOG.info("IO stats: {}", ioStats);
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
// stream_evict_blocks_from_cache is expected to be higher than 4, however we might face
// transient failures due to async prefetch get cancel issues. While TIMEOUT_MILLIS is
// sufficient wait time, consider re-running the test if stream_evict_blocks_from_cache
// value stays lower than 4.
assertThatStatisticCounter(ioStats,
STREAM_EVICT_BLOCKS_FROM_FILE_CACHE).isGreaterThanOrEqualTo(4);
verifyStatisticCounterValues(ioStats, STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
STREAM_FILE_CACHE_EVICTION);
});
}

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
@ -308,7 +309,7 @@ public static class FakeS3FilePerBlockCache extends SingleFilePerBlockCache {
public FakeS3FilePerBlockCache(int readDelay, int writeDelay) {
super(new EmptyS3AStatisticsContext().newInputStreamStatistics(),
Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT);
Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT, null);
this.files = new ConcurrentHashMap<>();
this.readDelay = readDelay;
this.writeDelay = writeDelay;
@ -381,7 +382,7 @@ public int read(ByteBuffer buffer, long offset, int size)
}
@Override
protected BlockCache createCache(int maxBlocksCount) {
protected BlockCache createCache(int maxBlocksCount, DurationTrackerFactory trackerFactory) {
final int readDelayMs = 50;
final int writeDelayMs = 200;
return new FakeS3FilePerBlockCache(readDelayMs, writeDelayMs);