HADOOP-18756. S3A prefetch - CachingBlockManager to use AtomicBoolean for closed flag (#5718)

Contributed by Viraj Jasani
This commit is contained in:
Viraj Jasani 2023-06-14 04:51:54 -07:00 committed by GitHub
parent f0c4286e3e
commit a75e378868
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -38,6 +38,7 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
@ -68,7 +69,7 @@ public class SingleFilePerBlockCache implements BlockCache {
*/ */
private int numGets = 0; private int numGets = 0;
private boolean closed; private final AtomicBoolean closed;
private final PrefetchingStatistics prefetchingStatistics; private final PrefetchingStatistics prefetchingStatistics;
@ -174,6 +175,7 @@ private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) {
*/ */
public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) { public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
this.prefetchingStatistics = requireNonNull(prefetchingStatistics); this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.closed = new AtomicBoolean(false);
} }
/** /**
@ -207,7 +209,7 @@ public int size() {
*/ */
@Override @Override
public void get(int blockNumber, ByteBuffer buffer) throws IOException { public void get(int blockNumber, ByteBuffer buffer) throws IOException {
if (closed) { if (closed.get()) {
return; return;
} }
@ -262,7 +264,7 @@ private Entry getEntry(int blockNumber) {
@Override @Override
public void put(int blockNumber, ByteBuffer buffer, Configuration conf, public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
LocalDirAllocator localDirAllocator) throws IOException { LocalDirAllocator localDirAllocator) throws IOException {
if (closed) { if (closed.get()) {
return; return;
} }
@ -333,37 +335,31 @@ protected Path getCacheFilePath(final Configuration conf,
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (closed) { if (closed.compareAndSet(false, true)) {
return; LOG.debug(getStats());
} int numFilesDeleted = 0;
closed = true; for (Entry entry : blocks.values()) {
boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
LOG.info(getStats());
int numFilesDeleted = 0;
for (Entry entry : blocks.values()) {
boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
continue; if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
continue;
}
try {
Files.deleteIfExists(entry.path);
prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++;
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", entry.path, e);
} finally {
entry.releaseLock(Entry.LockType.WRITE);
}
} }
try {
Files.deleteIfExists(entry.path);
prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++;
} catch (IOException e) {
LOG.debug("Failed to delete cache file {}", entry.path, e);
} finally {
entry.releaseLock(Entry.LockType.WRITE);
}
}
if (numFilesDeleted > 0) { LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
LOG.info("Deleted {} cache files", numFilesDeleted);
} }
} }