diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java index 2e65f12cc0..7cd7f98992 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java @@ -205,8 +205,10 @@ public void run() { // It's also possible that we'll end up requesting readahead on some // other FD, which may be wasted work, but won't cause a problem. try { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, - fd, off, len, POSIX_FADV_WILLNEED); + if (fd.valid()) { + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + identifier, fd, off, len, POSIX_FADV_WILLNEED); + } } catch (IOException ioe) { if (canceled) { // no big deal - the reader canceled the request and closed diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java index 6a4e3b4ba0..e9f0f34c69 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.RandomAccessFile; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; import org.apache.hadoop.io.nativeio.NativeIO; @@ -37,13 +38,14 @@ public class FadvisedChunkedFile extends ChunkedFile { private static final Logger LOG = LoggerFactory.getLogger(FadvisedChunkedFile.class); + private final Object closeLock = new Object(); private final boolean manageOsCache; private final int readaheadLength; private final ReadaheadPool readaheadPool; private final FileDescriptor fd; private final String identifier; - private ReadaheadRequest readaheadRequest; + private volatile ReadaheadRequest readaheadRequest; public FadvisedChunkedFile(RandomAccessFile file, long position, long count, int chunkSize, boolean manageOsCache, int readaheadLength, @@ -56,31 +58,50 @@ public FadvisedChunkedFile(RandomAccessFile file, long position, long count, this.identifier = identifier; } + @VisibleForTesting + FileDescriptor getFd() { + return fd; + } + @Override public Object nextChunk() throws Exception { - if (manageOsCache && readaheadPool != null) { - readaheadRequest = readaheadPool - .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength, - getEndOffset(), readaheadRequest); + synchronized (closeLock) { + if (fd.valid()) { + if (manageOsCache && readaheadPool != null) { + readaheadRequest = readaheadPool + .readaheadStream( + identifier, fd, getCurrentOffset(), readaheadLength, + getEndOffset(), readaheadRequest); + } + return super.nextChunk(); + } else { + return null; + } } - return super.nextChunk(); } @Override public void close() throws Exception { - if (readaheadRequest != null) { - readaheadRequest.cancel(); - } - if (manageOsCache && getEndOffset() - getStartOffset() > 0) { - try { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, - fd, - getStartOffset(), getEndOffset() - getStartOffset(), - POSIX_FADV_DONTNEED); - } catch (Throwable t) { - LOG.warn("Failed to manage OS cache for " + identifier, t); + synchronized (closeLock) { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + readaheadRequest = null; } + if (fd.valid() && + manageOsCache && getEndOffset() - getStartOffset() > 0) { + try { + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + identifier, + fd, + getStartOffset(), getEndOffset() - getStartOffset(), + POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier + + " fd " + fd.toString(), t); + } + } + // fd becomes invalid upon closing + super.close(); } - super.close(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedChunkedFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedChunkedFile.java new file mode 100644 index 0000000000..b6d0fd2544 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedChunkedFile.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.junit.Test; + +import java.io.File; +import java.io.RandomAccessFile; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for FadvisedChunkedFile. + */ +public class TestFadvisedChunkedFile { + + @Test + public void testDoubleClose() throws Exception { + File absoluteFile = new File("target", + TestFadvisedChunkedFile.class.getSimpleName()).getAbsoluteFile(); + absoluteFile.deleteOnExit(); + try { + try (RandomAccessFile f = new RandomAccessFile( + absoluteFile.getAbsolutePath(), "rw")) { + FadvisedChunkedFile af = new FadvisedChunkedFile( + f, 0, 5, 2, true, + 10, null, "foo"); + + assertTrue("fd not valid", f.getFD().valid()); + af.close(); + assertFalse("fd still valid", f.getFD().valid()); + af.close(); + assertFalse("fd still valid", f.getFD().valid()); + } + } finally { + absoluteFile.delete(); + } + } +}