HDFS-7494. Checking of closed in DFSInputStream#pread() should be protected by synchronization (Ted Yu via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2014-12-16 11:07:27 -08:00
parent c65f1b382e
commit a97a1e7317
2 changed files with 11 additions and 7 deletions

View File

@ -598,6 +598,9 @@ Release 2.7.0 - UNRELEASED
HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport
latency. (Ming Ma via kihwal) latency. (Ming Ma via kihwal)
HDFS-7494. Checking of closed in DFSInputStream#pread() should be protected
by synchronization (Ted Yu via Colin P. McCabe)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -41,6 +41,7 @@
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -90,7 +91,7 @@ public class DFSInputStream extends FSInputStream
public static boolean tcpReadsDisabledForTesting = false; public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0; private long hedgedReadOpsLoopNumForTesting = 0;
private final DFSClient dfsClient; private final DFSClient dfsClient;
private boolean closed = false; private AtomicBoolean closed = new AtomicBoolean(false);
private final String src; private final String src;
private final boolean verifyChecksum; private final boolean verifyChecksum;
@ -661,7 +662,8 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
*/ */
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (closed) { if (!closed.compareAndSet(false, true)) {
DFSClient.LOG.warn("DFSInputStream has been closed already");
return; return;
} }
dfsClient.checkOpen(); dfsClient.checkOpen();
@ -685,7 +687,6 @@ public void accept(ByteBuffer k, Object v) {
blockReader = null; blockReader = null;
} }
super.close(); super.close();
closed = true;
} }
@Override @Override
@ -822,7 +823,7 @@ private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
if (closed) { if (closed.get()) {
throw new IOException("Stream closed"); throw new IOException("Stream closed");
} }
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
@ -1375,7 +1376,7 @@ private int pread(long position, byte[] buffer, int offset, int length)
throws IOException { throws IOException {
// sanity checks // sanity checks
dfsClient.checkOpen(); dfsClient.checkOpen();
if (closed) { if (closed.get()) {
throw new IOException("Stream closed"); throw new IOException("Stream closed");
} }
failures = 0; failures = 0;
@ -1484,7 +1485,7 @@ public synchronized void seek(long targetPos) throws IOException {
if (targetPos < 0) { if (targetPos < 0) {
throw new EOFException("Cannot seek to negative offset"); throw new EOFException("Cannot seek to negative offset");
} }
if (closed) { if (closed.get()) {
throw new IOException("Stream is closed!"); throw new IOException("Stream is closed!");
} }
boolean done = false; boolean done = false;
@ -1571,7 +1572,7 @@ public synchronized long getPos() throws IOException {
*/ */
@Override @Override
public synchronized int available() throws IOException { public synchronized int available() throws IOException {
if (closed) { if (closed.get()) {
throw new IOException("Stream closed"); throw new IOException("Stream closed");
} }