HADOOP-18740. S3A prefetch cache blocks should be accessed by RW locks (#5675)

Contributed by Viraj Jasani
This commit is contained in:
Viraj Jasani 2023-06-07 06:05:52 -07:00 committed by GitHub
parent 9de13f879a
commit 1dbaba8e70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -37,6 +37,8 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -70,6 +72,16 @@ public class SingleFilePerBlockCache implements BlockCache {
private final PrefetchingStatistics prefetchingStatistics; private final PrefetchingStatistics prefetchingStatistics;
/**
* Timeout to be used by close, while acquiring prefetch block write lock.
*/
private static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;
/**
* Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
*/
private static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;
/** /**
* File attributes attached to any intermediate temporary file created during index creation. * File attributes attached to any intermediate temporary file created during index creation.
*/ */
@ -85,12 +97,18 @@ private static final class Entry {
private final Path path; private final Path path;
private final int size; private final int size;
private final long checksum; private final long checksum;
private final ReentrantReadWriteLock lock;
private enum LockType {
READ,
WRITE
}
Entry(int blockNumber, Path path, int size, long checksum) { Entry(int blockNumber, Path path, int size, long checksum) {
this.blockNumber = blockNumber; this.blockNumber = blockNumber;
this.path = path; this.path = path;
this.size = size; this.size = size;
this.checksum = checksum; this.checksum = checksum;
this.lock = new ReentrantReadWriteLock();
} }
@Override @Override
@ -99,6 +117,54 @@ public String toString() {
"([%03d] %s: size = %d, checksum = %d)", "([%03d] %s: size = %d, checksum = %d)",
blockNumber, path, size, checksum); blockNumber, path, size, checksum);
} }
/**
* Take the read or write lock.
*
* @param lockType type of the lock.
*/
private void takeLock(LockType lockType) {
if (LockType.READ == lockType) {
lock.readLock().lock();
} else if (LockType.WRITE == lockType) {
lock.writeLock().lock();
}
}
/**
* Release the read or write lock.
*
* @param lockType type of the lock.
*/
private void releaseLock(LockType lockType) {
if (LockType.READ == lockType) {
lock.readLock().unlock();
} else if (LockType.WRITE == lockType) {
lock.writeLock().unlock();
}
}
/**
* Try to take the read or write lock within the given timeout.
*
* @param lockType type of the lock.
* @param timeout the time to wait for the given lock.
* @param unit the time unit of the timeout argument.
* @return true if the lock of the given lock type was acquired.
*/
private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) {
try {
if (LockType.READ == lockType) {
return lock.readLock().tryLock(timeout, unit);
} else if (LockType.WRITE == lockType) {
return lock.writeLock().tryLock(timeout, unit);
}
} catch (InterruptedException e) {
LOG.warn("Thread interrupted while trying to acquire {} lock", lockType, e);
Thread.currentThread().interrupt();
}
return false;
}
} }
/** /**
@ -148,11 +214,15 @@ public void get(int blockNumber, ByteBuffer buffer) throws IOException {
checkNotNull(buffer, "buffer"); checkNotNull(buffer, "buffer");
Entry entry = getEntry(blockNumber); Entry entry = getEntry(blockNumber);
buffer.clear(); entry.takeLock(Entry.LockType.READ);
readFile(entry.path, buffer); try {
buffer.rewind(); buffer.clear();
readFile(entry.path, buffer);
validateEntry(entry, buffer); buffer.rewind();
validateEntry(entry, buffer);
} finally {
entry.releaseLock(Entry.LockType.READ);
}
} }
protected int readFile(Path path, ByteBuffer buffer) throws IOException { protected int readFile(Path path, ByteBuffer buffer) throws IOException {
@ -200,7 +270,12 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
if (blocks.containsKey(blockNumber)) { if (blocks.containsKey(blockNumber)) {
Entry entry = blocks.get(blockNumber); Entry entry = blocks.get(blockNumber);
validateEntry(entry, buffer); entry.takeLock(Entry.LockType.READ);
try {
validateEntry(entry, buffer);
} finally {
entry.releaseLock(Entry.LockType.READ);
}
return; return;
} }
@ -268,12 +343,22 @@ public void close() throws IOException {
int numFilesDeleted = 0; int numFilesDeleted = 0;
for (Entry entry : blocks.values()) { for (Entry entry : blocks.values()) {
boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
continue;
}
try { try {
Files.deleteIfExists(entry.path); Files.deleteIfExists(entry.path);
prefetchingStatistics.blockRemovedFromFileCache(); prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++; numFilesDeleted++;
} catch (IOException e) { } catch (IOException e) {
LOG.debug("Failed to delete cache file {}", entry.path, e); LOG.debug("Failed to delete cache file {}", entry.path, e);
} finally {
entry.releaseLock(Entry.LockType.WRITE);
} }
} }