From 1c15987ee38b96cf2ef4ff383f3f86e082c50fab Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Fri, 23 Jun 2023 14:40:03 -0400 Subject: [PATCH] MAPREDUCE-7441. Fix race condition in closing FadvisedFileRegion. Contributed by Benjamin Teke --- .../hadoop/mapred/FadvisedFileRegion.java | 102 ++++++++++-------- 1 file changed, 58 insertions(+), 44 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java index 9290a282e3..184b58e6c7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java @@ -41,6 +41,7 @@ public class FadvisedFileRegion extends DefaultFileRegion { private static final Logger LOG = LoggerFactory.getLogger(FadvisedFileRegion.class); + private final Object closeLock = new Object(); private final boolean manageOsCache; private final int readaheadLength; private final ReadaheadPool readaheadPool; @@ -51,12 +52,12 @@ public class FadvisedFileRegion extends DefaultFileRegion { private final int shuffleBufferSize; private final boolean shuffleTransferToAllowed; private final FileChannel fileChannel; - - private ReadaheadRequest readaheadRequest; + + private volatile ReadaheadRequest readaheadRequest; public FadvisedFileRegion(RandomAccessFile file, long position, long count, boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, - String identifier, int shuffleBufferSize, + String identifier, int shuffleBufferSize, boolean shuffleTransferToAllowed) throws IOException { super(file.getChannel(), position, count); this.manageOsCache = manageOsCache; @@ -73,97 +74,110 @@ public FadvisedFileRegion(RandomAccessFile file, long position, long count, @Override public long transferTo(WritableByteChannel target, long position) - throws IOException { - if (readaheadPool != null && readaheadLength > 0) { - readaheadRequest = readaheadPool.readaheadStream(identifier, fd, - position() + position, readaheadLength, - position() + count(), readaheadRequest); + throws IOException { + synchronized (closeLock) { + if (fd.valid()) { + if (readaheadPool != null && readaheadLength > 0) { + readaheadRequest = readaheadPool.readaheadStream(identifier, fd, + position() + position, readaheadLength, + position() + count(), readaheadRequest); + } + + if(this.shuffleTransferToAllowed) { + return super.transferTo(target, position); + } else { + return customShuffleTransfer(target, position); + } + } else { + return 0L; + } } - - if(this.shuffleTransferToAllowed) { - return super.transferTo(target, position); - } else { - return customShuffleTransfer(target, position); - } + } /** - * This method transfers data using local buffer. It transfers data from - * a disk to a local buffer in memory, and then it transfers data from the + * This method transfers data using local buffer. It transfers data from + * a disk to a local buffer in memory, and then it transfers data from the * buffer to the target. This is used only if transferTo is disallowed in - * the configuration file. super.TransferTo does not perform well on Windows - * due to a small IO request generated. customShuffleTransfer can control - * the size of the IO requests by changing the size of the intermediate + * the configuration file. super.TransferTo does not perform well on Windows + * due to a small IO request generated. customShuffleTransfer can control + * the size of the IO requests by changing the size of the intermediate * buffer. */ @VisibleForTesting long customShuffleTransfer(WritableByteChannel target, long position) - throws IOException { + throws IOException { long actualCount = this.count - position; if (actualCount < 0 || position < 0) { throw new IllegalArgumentException( - "position out of range: " + position + - " (expected: 0 - " + (this.count - 1) + ')'); + "position out of range: " + position + + " (expected: 0 - " + (this.count - 1) + ')'); } if (actualCount == 0) { return 0L; } - + long trans = actualCount; int readSize; ByteBuffer byteBuffer = ByteBuffer.allocate( - Math.min( - this.shuffleBufferSize, - trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans)); - + Math.min( + this.shuffleBufferSize, + trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans)); + while(trans > 0L && - (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) { + (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) { //adjust counters and buffer limit if(readSize < trans) { trans -= readSize; position += readSize; byteBuffer.flip(); } else { - //We can read more than we need if the actualCount is not multiple + //We can read more than we need if the actualCount is not multiple //of the byteBuffer size and file is big enough. In that case we cannot //use flip method but we need to set buffer limit manually to trans. byteBuffer.limit((int)trans); byteBuffer.position(0); - position += trans; + position += trans; trans = 0; } - + //write data to the target while(byteBuffer.hasRemaining()) { target.write(byteBuffer); } - + byteBuffer.clear(); } - + return actualCount - trans; } - + @Override protected void deallocate() { - if (readaheadRequest != null) { - readaheadRequest.cancel(); + synchronized (closeLock) { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + readaheadRequest = null; + } + super.deallocate(); } - super.deallocate(); } - + /** * Call when the transfer completes successfully so we can advise the OS that * we don't need the region to be cached anymore. */ public void transferSuccessful() { - if (manageOsCache && count() > 0) { - try { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, - fd, position(), count(), POSIX_FADV_DONTNEED); - } catch (Throwable t) { - LOG.warn("Failed to manage OS cache for " + identifier, t); + synchronized (closeLock) { + if (fd.valid() && manageOsCache && count() > 0) { + try { + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, + fd, position(), count(), POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier + + " fd " + fd, t); + } } } }