diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index 748edcdb27..b58cf16a32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -61,4 +61,6 @@ public void readFromDatanodeDelay() {} public boolean skipRollingRestartWait() { return false; } + + public void sleepBeforeHedgedGet() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 6bff172b80..97d3de4a96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -830,60 +830,85 @@ src, getPos(), reqLen)){ private DNAddrPair chooseDataNode(LocatedBlock block, Collection ignoredNodes) throws IOException { + return chooseDataNode(block, ignoredNodes, true); + } + + /** + * Choose datanode to read from. + * + * @param block Block to choose datanode addr from + * @param ignoredNodes Ignored nodes inside. + * @param refetchIfRequired Whether to refetch if no nodes to chose + * from. + * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is + * false. + */ + private DNAddrPair chooseDataNode(LocatedBlock block, + Collection ignoredNodes, boolean refetchIfRequired) + throws IOException { while (true) { DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes); if (result != null) { return result; + } else if (refetchIfRequired) { + block = refetchLocations(block, ignoredNodes); } else { - String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), - deadNodes, ignoredNodes); - String blockInfo = block.getBlock() + " file=" + src; - if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { - String description = "Could not obtain block: " + blockInfo; - DFSClient.LOG.warn(description + errMsg - + ". Throwing a BlockMissingException"); - throw new BlockMissingException(src, description, - block.getStartOffset()); - } - - DatanodeInfo[] nodes = block.getLocations(); - if (nodes == null || nodes.length == 0) { - DFSClient.LOG.info("No node available for " + blockInfo); - } - DFSClient.LOG.info("Could not obtain " + block.getBlock() - + " from any node: " + errMsg - + ". Will get new block locations from namenode and retry..."); - try { - // Introducing a random factor to the wait time before another retry. - // The wait time is dependent on # of failures and a random factor. - // At the first time of getting a BlockMissingException, the wait time - // is a random number between 0..3000 ms. If the first retry - // still fails, we will wait 3000 ms grace period before the 2nd retry. - // Also at the second retry, the waiting window is expanded to 6000 ms - // alleviating the request rate from the server. Similarly the 3rd retry - // will wait 6000ms grace period before retry and the waiting window is - // expanded to 9000ms. - final int timeWindow = dfsClient.getConf().getTimeWindow(); - double waitTime = timeWindow * failures + // grace period for the last round of attempt - // expanding time window for each failure - timeWindow * (failures + 1) * - ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + - " IOException, will wait for " + waitTime + " msec."); - Thread.sleep((long)waitTime); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new InterruptedIOException( - "Interrupted while choosing DataNode for read."); - } - deadNodes.clear(); //2nd option is to remove only nodes[blockId] - openInfo(true); - block = refreshLocatedBlock(block); - failures++; + return null; } } } + private LocatedBlock refetchLocations(LocatedBlock block, + Collection ignoredNodes) throws IOException { + String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), + deadNodes, ignoredNodes); + String blockInfo = block.getBlock() + " file=" + src; + if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { + String description = "Could not obtain block: " + blockInfo; + DFSClient.LOG.warn(description + errMsg + + ". Throwing a BlockMissingException"); + throw new BlockMissingException(src, description, + block.getStartOffset()); + } + + DatanodeInfo[] nodes = block.getLocations(); + if (nodes == null || nodes.length == 0) { + DFSClient.LOG.info("No node available for " + blockInfo); + } + DFSClient.LOG.info("Could not obtain " + block.getBlock() + + " from any node: " + errMsg + + ". Will get new block locations from namenode and retry..."); + try { + // Introducing a random factor to the wait time before another retry. + // The wait time is dependent on # of failures and a random factor. + // At the first time of getting a BlockMissingException, the wait time + // is a random number between 0..3000 ms. If the first retry + // still fails, we will wait 3000 ms grace period before the 2nd retry. + // Also at the second retry, the waiting window is expanded to 6000 ms + // alleviating the request rate from the server. Similarly the 3rd retry + // will wait 6000ms grace period before retry and the waiting window is + // expanded to 9000ms. + final int timeWindow = dfsClient.getConf().getTimeWindow(); + // grace period for the last round of attempt + double waitTime = timeWindow * failures + + // expanding time window for each failure + timeWindow * (failures + 1) * + ThreadLocalRandom.current().nextDouble(); + DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + + " IOException, will wait for " + waitTime + " msec."); + Thread.sleep((long)waitTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException( + "Interrupted while choosing DataNode for read."); + } + deadNodes.clear(); //2nd option is to remove only nodes[blockId] + openInfo(true); + block = refreshLocatedBlock(block); + failures++; + return block; + } + /** * Get the best node from which to stream the data. * @param block LocatedBlock, containing nodes in priority order. @@ -985,6 +1010,7 @@ private Callable getFromOneDataNode(final DNAddrPair datanode, return new Callable() { @Override public ByteBuffer call() throws Exception { + DFSClientFaultInjector.get().sleepBeforeHedgedGet(); try (TraceScope ignored = dfsClient.getTracer(). newScope("hedgedRead" + hedgedReadId, parentSpanId)) { actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlocks); @@ -1159,20 +1185,22 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, // We are starting up a 'hedged' read. We have a read already // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // If no nodes to do hedged reads against, pass. + boolean refetch = false; try { - chosenNode = getBestNodeDNAddrPair(block, ignored); - if (chosenNode == null) { - chosenNode = chooseDataNode(block, ignored); + chosenNode = chooseDataNode(block, ignored, false); + if (chosenNode != null) { + // Latest block, if refreshed internally + block = chosenNode.block; + bb = ByteBuffer.allocate(len); + Callable getFromDataNodeCallable = + getFromOneDataNode(chosenNode, block, start, end, bb, + corruptedBlocks, hedgedReadId++); + Future oneMoreRequest = + hedgedService.submit(getFromDataNodeCallable); + futures.add(oneMoreRequest); + } else { + refetch = true; } - // Latest block, if refreshed internally - block = chosenNode.block; - bb = ByteBuffer.allocate(len); - Callable getFromDataNodeCallable = getFromOneDataNode( - chosenNode, block, start, end, bb, - corruptedBlocks, hedgedReadId++); - Future oneMoreRequest = hedgedService - .submit(getFromDataNodeCallable); - futures.add(oneMoreRequest); } catch (IOException ioe) { DFSClient.LOG.debug("Failed getting node for hedged read: {}", ioe.getMessage()); @@ -1190,6 +1218,9 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, } catch (InterruptedException ie) { // Ignore and retry } + if (refetch) { + refetchLocations(block, ignored); + } // We got here if exception. Ignore this node on next go around IFF // we found a chosenNode to hedge read against. if (chosenNode != null && chosenNode.info != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index bcb02b3d2c..0834d30d38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -626,7 +626,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { */ @Test public void testPreadFailureWithChangedBlockLocations() throws Exception { - doPreadTestWithChangedLocations(); + doPreadTestWithChangedLocations(1); } /** @@ -639,21 +639,36 @@ public void testPreadFailureWithChangedBlockLocations() throws Exception { * 7. Consider next calls to getBlockLocations() always returns DN3 as last * location.
*/ - @Test + @Test(timeout = 60000) public void testPreadHedgedFailureWithChangedBlockLocations() throws Exception { isHedgedRead = true; - doPreadTestWithChangedLocations(); + DFSClientFaultInjector old = DFSClientFaultInjector.get(); + try { + DFSClientFaultInjector.set(new DFSClientFaultInjector() { + public void sleepBeforeHedgedGet() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + } + }); + doPreadTestWithChangedLocations(2); + } finally { + DFSClientFaultInjector.set(old); + } } - private void doPreadTestWithChangedLocations() + private void doPreadTestWithChangedLocations(int maxFailures) throws IOException, TimeoutException, InterruptedException { GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); if (isHedgedRead) { + conf.setInt(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 100); conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2); + conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1000); } try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { @@ -747,6 +762,9 @@ public Boolean get() { int n = din.read(0, buf, 0, data.length()); assertEquals(data.length(), n); assertEquals("Data should be read", data, new String(buf, 0, n)); + assertTrue("Read should complete with maximum " + maxFailures + + " failures, but completed with " + din.failures, + din.failures <= maxFailures); DFSClient.LOG.info("Read completed"); } }