diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index dcc997c173..6bff172b80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1131,8 +1131,9 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, Future firstRequest = hedgedService .submit(getFromDataNodeCallable); futures.add(firstRequest); + Future future = null; try { - Future future = hedgedService.poll( + future = hedgedService.poll( conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS); if (future != null) { ByteBuffer result = future.get(); @@ -1142,16 +1143,18 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, } DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged " + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info); - // Ignore this node on next go around. - ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); // continue; no need to refresh block locations } catch (ExecutionException e) { - // Ignore + futures.remove(future); } catch (InterruptedException e) { throw new InterruptedIOException( "Interrupted while waiting for reading task"); } + // Ignore this node on next go around. + // If poll timeout and the request still ongoing, don't consider it + // again. If read data failed, don't consider it either. + ignored.add(chosenNode.info); } else { // We are starting up a 'hedged' read. We have a read already // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index 85fc97bc15..bcb02b3d2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -59,6 +59,8 @@ import org.mockito.stubbing.Answer; import com.google.common.base.Supplier; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; /** * This class tests the DFS positional read functionality in a single node @@ -72,6 +74,9 @@ public class TestPread { boolean simulatedStorage; boolean isHedgedRead; + private static final Logger LOG = + LoggerFactory.getLogger(TestPread.class.getName()); + @Before public void setup() { simulatedStorage = false; @@ -551,6 +556,64 @@ public Void call() throws IOException { } } + @Test(timeout=30000) + public void testHedgedReadFromAllDNFailed() throws IOException { + Configuration conf = new Configuration(); + int numHedgedReadPoolThreads = 5; + final int hedgedReadTimeoutMillis = 50; + + conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, + numHedgedReadPoolThreads); + conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, + hedgedReadTimeoutMillis); + conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0); + // Set up the InjectionHandler + DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class)); + DFSClientFaultInjector injector = DFSClientFaultInjector.get(); + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (true) { + LOG.info("-------------- throw Checksum Exception"); + throw new ChecksumException("ChecksumException test", 100); + } + return null; + } + }).when(injector).fetchFromDatanodeException(); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .format(true).build(); + DistributedFileSystem fileSys = cluster.getFileSystem(); + DFSClient dfsClient = fileSys.getClient(); + FSDataOutputStream output = null; + DFSInputStream input = null; + String filename = "/hedgedReadMaxOut.dat"; + DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); + // Metrics instance is static, so we need to reset counts from prior tests. + metrics.hedgedReadOps.set(0); + try { + Path file = new Path(filename); + output = fileSys.create(file, (short) 2); + byte[] data = new byte[64 * 1024]; + output.write(data); + output.flush(); + output.close(); + byte[] buffer = new byte[64 * 1024]; + input = dfsClient.open(filename); + input.read(0, buffer, 0, 1024); + Assert.fail("Reading the block should have thrown BlockMissingException"); + } catch (BlockMissingException e) { + assertEquals(3, input.getHedgedReadOpsLoopNumForTesting()); + assertTrue(metrics.getHedgedReadOps() == 0); + } finally { + Mockito.reset(injector); + IOUtils.cleanupWithLogger(LOG, input); + IOUtils.cleanupWithLogger(LOG, output); + fileSys.close(); + cluster.shutdown(); + } + } + /** * Scenario: 1. Write a file with RF=2, DN1 and DN2
* 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.