MAPREDUCE-7441. Fix race condition in closing FadvisedFileRegion. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2023-06-23 14:40:03 -04:00
parent eb88b9ff21
commit 1c15987ee3

View File

@ -41,6 +41,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(FadvisedFileRegion.class); LoggerFactory.getLogger(FadvisedFileRegion.class);
private final Object closeLock = new Object();
private final boolean manageOsCache; private final boolean manageOsCache;
private final int readaheadLength; private final int readaheadLength;
private final ReadaheadPool readaheadPool; private final ReadaheadPool readaheadPool;
@ -51,12 +52,12 @@ public class FadvisedFileRegion extends DefaultFileRegion {
private final int shuffleBufferSize; private final int shuffleBufferSize;
private final boolean shuffleTransferToAllowed; private final boolean shuffleTransferToAllowed;
private final FileChannel fileChannel; private final FileChannel fileChannel;
private ReadaheadRequest readaheadRequest; private volatile ReadaheadRequest readaheadRequest;
public FadvisedFileRegion(RandomAccessFile file, long position, long count, public FadvisedFileRegion(RandomAccessFile file, long position, long count,
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
String identifier, int shuffleBufferSize, String identifier, int shuffleBufferSize,
boolean shuffleTransferToAllowed) throws IOException { boolean shuffleTransferToAllowed) throws IOException {
super(file.getChannel(), position, count); super(file.getChannel(), position, count);
this.manageOsCache = manageOsCache; this.manageOsCache = manageOsCache;
@ -73,97 +74,110 @@ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
@Override @Override
public long transferTo(WritableByteChannel target, long position) public long transferTo(WritableByteChannel target, long position)
throws IOException { throws IOException {
if (readaheadPool != null && readaheadLength > 0) { synchronized (closeLock) {
readaheadRequest = readaheadPool.readaheadStream(identifier, fd, if (fd.valid()) {
position() + position, readaheadLength, if (readaheadPool != null && readaheadLength > 0) {
position() + count(), readaheadRequest); readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
position() + position, readaheadLength,
position() + count(), readaheadRequest);
}
if(this.shuffleTransferToAllowed) {
return super.transferTo(target, position);
} else {
return customShuffleTransfer(target, position);
}
} else {
return 0L;
}
} }
if(this.shuffleTransferToAllowed) {
return super.transferTo(target, position);
} else {
return customShuffleTransfer(target, position);
}
} }
/** /**
* This method transfers data using local buffer. It transfers data from * This method transfers data using local buffer. It transfers data from
* a disk to a local buffer in memory, and then it transfers data from the * a disk to a local buffer in memory, and then it transfers data from the
* buffer to the target. This is used only if transferTo is disallowed in * buffer to the target. This is used only if transferTo is disallowed in
* the configuration file. super.TransferTo does not perform well on Windows * the configuration file. super.TransferTo does not perform well on Windows
* due to a small IO request generated. customShuffleTransfer can control * due to a small IO request generated. customShuffleTransfer can control
* the size of the IO requests by changing the size of the intermediate * the size of the IO requests by changing the size of the intermediate
* buffer. * buffer.
*/ */
@VisibleForTesting @VisibleForTesting
long customShuffleTransfer(WritableByteChannel target, long position) long customShuffleTransfer(WritableByteChannel target, long position)
throws IOException { throws IOException {
long actualCount = this.count - position; long actualCount = this.count - position;
if (actualCount < 0 || position < 0) { if (actualCount < 0 || position < 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"position out of range: " + position + "position out of range: " + position +
" (expected: 0 - " + (this.count - 1) + ')'); " (expected: 0 - " + (this.count - 1) + ')');
} }
if (actualCount == 0) { if (actualCount == 0) {
return 0L; return 0L;
} }
long trans = actualCount; long trans = actualCount;
int readSize; int readSize;
ByteBuffer byteBuffer = ByteBuffer.allocate( ByteBuffer byteBuffer = ByteBuffer.allocate(
Math.min( Math.min(
this.shuffleBufferSize, this.shuffleBufferSize,
trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans)); trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
while(trans > 0L && while(trans > 0L &&
(readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) { (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
//adjust counters and buffer limit //adjust counters and buffer limit
if(readSize < trans) { if(readSize < trans) {
trans -= readSize; trans -= readSize;
position += readSize; position += readSize;
byteBuffer.flip(); byteBuffer.flip();
} else { } else {
//We can read more than we need if the actualCount is not multiple //We can read more than we need if the actualCount is not multiple
//of the byteBuffer size and file is big enough. In that case we cannot //of the byteBuffer size and file is big enough. In that case we cannot
//use flip method but we need to set buffer limit manually to trans. //use flip method but we need to set buffer limit manually to trans.
byteBuffer.limit((int)trans); byteBuffer.limit((int)trans);
byteBuffer.position(0); byteBuffer.position(0);
position += trans; position += trans;
trans = 0; trans = 0;
} }
//write data to the target //write data to the target
while(byteBuffer.hasRemaining()) { while(byteBuffer.hasRemaining()) {
target.write(byteBuffer); target.write(byteBuffer);
} }
byteBuffer.clear(); byteBuffer.clear();
} }
return actualCount - trans; return actualCount - trans;
} }
@Override @Override
protected void deallocate() { protected void deallocate() {
if (readaheadRequest != null) { synchronized (closeLock) {
readaheadRequest.cancel(); if (readaheadRequest != null) {
readaheadRequest.cancel();
readaheadRequest = null;
}
super.deallocate();
} }
super.deallocate();
} }
/** /**
* Call when the transfer completes successfully so we can advise the OS that * Call when the transfer completes successfully so we can advise the OS that
* we don't need the region to be cached anymore. * we don't need the region to be cached anymore.
*/ */
public void transferSuccessful() { public void transferSuccessful() {
if (manageOsCache && count() > 0) { synchronized (closeLock) {
try { if (fd.valid() && manageOsCache && count() > 0) {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, try {
fd, position(), count(), POSIX_FADV_DONTNEED); NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
} catch (Throwable t) { fd, position(), count(), POSIX_FADV_DONTNEED);
LOG.warn("Failed to manage OS cache for " + identifier, t); } catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier +
" fd " + fd, t);
}
} }
} }
} }