From be34e85e682880f46eee0310bf00ecc7d39cd5bd Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 7 Jun 2016 10:48:21 -0700 Subject: [PATCH] HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao --- .../apache/hadoop/hdfs/DFSInputStream.java | 36 ++++++-- .../java/org/apache/hadoop/hdfs/TestRead.java | 87 +++++++++++++++++++ .../server/datanode/SimulatedFSDataset.java | 4 +- 3 files changed, 119 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 2ed0abd798..7f32a56c38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -19,8 +19,10 @@ import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -304,7 +306,7 @@ private void waitFor(int waitTime) throws IOException { try { Thread.sleep(waitTime); } catch (InterruptedException e) { - throw new IOException( + throw new InterruptedIOException( "Interrupted while getting the last block length."); } } @@ -379,6 +381,7 @@ private long readBlockLength(LocatedBlock locatedblock) throws IOException { return n; } } catch (IOException ioe) { + checkInterrupted(ioe); if (ioe instanceof RemoteException) { if (((RemoteException) ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) { @@ -414,7 +417,8 @@ private long readBlockLength(LocatedBlock locatedblock) throws IOException { try { Thread.sleep(500); // delay between retries. } catch (InterruptedException e) { - throw new IOException("Interrupted while getting the length."); + throw new InterruptedIOException( + "Interrupted while getting the length."); } } @@ -660,6 +664,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { } return chosenNode; } catch (IOException ex) { + checkInterrupted(ex); if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr @@ -681,6 +686,15 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { } } + private void checkInterrupted(IOException e) throws IOException { + if (Thread.currentThread().isInterrupted() && + (e instanceof ClosedByInterruptException || + e instanceof InterruptedIOException)) { + DFSClient.LOG.debug("The reading thread has been interrupted.", e); + throw e; + } + } + protected BlockReader getBlockReader(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode) throws IOException { @@ -948,6 +962,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, } catch (ChecksumException ce) { throw ce; } catch (IOException e) { + checkInterrupted(e); if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } @@ -1044,9 +1059,12 @@ private DNAddrPair chooseDataNode(LocatedBlock block, // expanding time window for each failure timeWindow * (failures + 1) * ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); + DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + + " IOException, will wait for " + waitTime + " msec."); Thread.sleep((long)waitTime); - } catch (InterruptedException ignored) { + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted while choosing DataNode for read."); } deadNodes.clear(); //2nd option is to remove only nodes[blockId] openInfo(true); @@ -1140,7 +1158,8 @@ protected void fetchBlockByteRange(LocatedBlock block, long start, long end, buf, offset, corruptedBlocks); return; } catch (IOException e) { - // Ignore. Already processed inside the function. + checkInterrupted(e); // check if the read has been interrupted + // Ignore other IOException. Already processed inside the function. // Loop through to try the next node. } } @@ -1218,6 +1237,7 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block, addToDeadNodes(datanode.info); throw new IOException(msg); } catch (IOException e) { + checkInterrupted(e); if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + datanode.addr @@ -1306,8 +1326,11 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); // continue; no need to refresh block locations - } catch (InterruptedException | ExecutionException e) { + } catch (ExecutionException e) { // Ignore + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted while waiting for reading task"); } } else { // We are starting up a 'hedged' read. We have a read already @@ -1594,6 +1617,7 @@ public synchronized void seek(long targetPos) throws IOException { } catch (IOException e) {//make following read to retry DFSClient.LOG.debug("Exception while seek to {} from {} of {} from " + "{}", targetPos, getCurrentBlock(), src, currentNode, e); + checkInterrupted(e); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java index 9d38fd77c1..974fdf8e64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java @@ -19,9 +19,19 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -103,4 +113,81 @@ public void testReadReservedPath() throws Exception { cluster.shutdown(); } } + + @Test(timeout=60000) + public void testInterruptReader() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, + DelayedSimulatedFSDataset.Factory.class.getName()); + + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf).numDataNodes(1).build(); + final DistributedFileSystem fs = cluster.getFileSystem(); + try { + cluster.waitActive(); + final Path file = new Path("/foo"); + DFSTestUtil.createFile(fs, file, 1024, (short) 1, 0L); + + final FSDataInputStream in = fs.open(file); + AtomicBoolean readInterrupted = new AtomicBoolean(false); + final Thread reader = new Thread(new Runnable() { + @Override + public void run() { + try { + in.read(new byte[1024], 0, 1024); + } catch (IOException e) { + if (e instanceof ClosedByInterruptException || + e instanceof InterruptedIOException) { + readInterrupted.set(true); + } + } + } + }); + + reader.start(); + Thread.sleep(1000); + reader.interrupt(); + reader.join(); + + Assert.assertTrue(readInterrupted.get()); + } finally { + cluster.shutdown(); + } + } + + private static class DelayedSimulatedFSDataset extends SimulatedFSDataset { + private volatile boolean isDelayed = true; + + DelayedSimulatedFSDataset(DataNode datanode, DataStorage storage, + Configuration conf) { + super(datanode, storage, conf); + } + + @Override + public synchronized InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { + while (isDelayed) { + try { + this.wait(); + } catch (InterruptedException ignored) { + } + } + InputStream result = super.getBlockInputStream(b); + IOUtils.skipFully(result, seekOffset); + return result; + } + + static class Factory extends FsDatasetSpi.Factory { + @Override + public DelayedSimulatedFSDataset newInstance(DataNode datanode, + DataStorage storage, Configuration conf) throws IOException { + return new DelayedSimulatedFSDataset(datanode, storage, conf); + } + + @Override + public boolean isSimulated() { + return true; + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 1fdedca928..25034c6d8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -960,8 +960,8 @@ public synchronized ReplicaHandler createTemporary( return new ReplicaHandler(binfo, null); } - synchronized InputStream getBlockInputStream(ExtendedBlock b - ) throws IOException { + protected synchronized InputStream getBlockInputStream(ExtendedBlock b) + throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) {