From 439614b0c8a3df3d8b7967451c5331a0e034e13a Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Tue, 28 Apr 2015 18:11:59 -0700 Subject: [PATCH] HDFS-8280. Code Cleanup in DFSInputStream. Contributed by Jing Zhao. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/DFSInputStream.java | 141 ++++++++---------- 2 files changed, 61 insertions(+), 82 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1c4cfb459c..e7fa8fd820 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -478,6 +478,8 @@ Release 2.8.0 - UNRELEASED HDFS-8176. Record from/to snapshots in audit log for snapshot diff report. (J. Andreina via jing9) + HDFS-8280. Code Cleanup in DFSInputStream. (Jing Zhao via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 3f90397827..32902237a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -601,7 +601,6 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { targetBlock.getBlockSize() - 1; this.currentLocatedBlock = targetBlock; - assert (target==pos) : "Wrong postion " + pos + " expect " + target; long offsetIntoBlock = target - targetBlock.getStartOffset(); DNAddrPair retval = chooseDataNode(targetBlock, null); @@ -610,35 +609,12 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { StorageType storageType = retval.storageType; try { - ExtendedBlock blk = targetBlock.getBlock(); - Token accessToken = targetBlock.getBlockToken(); - CachingStrategy curCachingStrategy; - boolean shortCircuitForbidden; - synchronized(infoLock) { - curCachingStrategy = cachingStrategy; - shortCircuitForbidden = shortCircuitForbidden(); - } - blockReader = new BlockReaderFactory(dfsClient.getConf()). - setInetSocketAddress(targetAddr). - setRemotePeerFactory(dfsClient). - setDatanodeInfo(chosenNode). - setStorageType(storageType). - setFileName(src). - setBlock(blk). - setBlockToken(accessToken). - setStartOffset(offsetIntoBlock). - setVerifyChecksum(verifyChecksum). - setClientName(dfsClient.clientName). - setLength(blk.getNumBytes() - offsetIntoBlock). - setCachingStrategy(curCachingStrategy). - setAllowShortCircuitLocalReads(!shortCircuitForbidden). - setClientCacheContext(dfsClient.getClientContext()). - setUserGroupInformation(dfsClient.ugi). - setConfiguration(dfsClient.getConfiguration()). - build(); + blockReader = getBlockReader(targetBlock, offsetIntoBlock, + targetBlock.getBlockSize() - offsetIntoBlock, targetAddr, + storageType, chosenNode); if(connectFailedOnce) { DFSClient.LOG.info("Successfully connected to " + targetAddr + - " for " + blk); + " for " + targetBlock.getBlock()); } return chosenNode; } catch (IOException ex) { @@ -663,6 +639,37 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { } } + protected BlockReader getBlockReader(LocatedBlock targetBlock, + long offsetInBlock, long length, InetSocketAddress targetAddr, + StorageType storageType, DatanodeInfo datanode) throws IOException { + ExtendedBlock blk = targetBlock.getBlock(); + Token accessToken = targetBlock.getBlockToken(); + CachingStrategy curCachingStrategy; + boolean shortCircuitForbidden; + synchronized (infoLock) { + curCachingStrategy = cachingStrategy; + shortCircuitForbidden = shortCircuitForbidden(); + } + return new BlockReaderFactory(dfsClient.getConf()). + setInetSocketAddress(targetAddr). + setRemotePeerFactory(dfsClient). + setDatanodeInfo(datanode). + setStorageType(storageType). + setFileName(src). + setBlock(blk). + setBlockToken(accessToken). + setStartOffset(offsetInBlock). + setVerifyChecksum(verifyChecksum). + setClientName(dfsClient.clientName). + setLength(length). + setCachingStrategy(curCachingStrategy). + setAllowShortCircuitLocalReads(!shortCircuitForbidden). + setClientCacheContext(dfsClient.getClientContext()). + setUserGroupInformation(dfsClient.ugi). + setConfiguration(dfsClient.getConfiguration()). + build(); + } + /** * Close it down! */ @@ -935,9 +942,10 @@ private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, private DNAddrPair chooseDataNode(LocatedBlock block, Collection ignoredNodes) throws IOException { while (true) { - try { - return getBestNodeDNAddrPair(block, ignoredNodes); - } catch (IOException ie) { + DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes); + if (result != null) { + return result; + } else { String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), deadNodes, ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; @@ -954,7 +962,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block, DFSClient.LOG.info("No node available for " + blockInfo); } DFSClient.LOG.info("Could not obtain " + block.getBlock() - + " from any node: " + ie + errMsg + + " from any node: " + errMsg + ". Will get new block locations from namenode and retry..."); try { // Introducing a random factor to the wait time before another retry. @@ -977,7 +985,6 @@ private DNAddrPair chooseDataNode(LocatedBlock block, openInfo(); block = getBlockAt(block.getStartOffset()); failures++; - continue; } } } @@ -986,11 +993,10 @@ private DNAddrPair chooseDataNode(LocatedBlock block, * Get the best node from which to stream the data. * @param block LocatedBlock, containing nodes in priority order. * @param ignoredNodes Do not choose nodes in this array (may be null) - * @return The DNAddrPair of the best node. - * @throws IOException + * @return The DNAddrPair of the best node. Null if no node can be chosen. */ private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, - Collection ignoredNodes) throws IOException { + Collection ignoredNodes) { DatanodeInfo[] nodes = block.getLocations(); StorageType[] storageTypes = block.getStorageTypes(); DatanodeInfo chosenNode = null; @@ -1010,9 +1016,10 @@ private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, } } if (chosenNode == null) { - throw new IOException("No live nodes contain block " + block.getBlock() + + DFSClient.LOG.warn("No live nodes contain block " + block.getBlock() + " after checking nodes = " + Arrays.toString(nodes) + ", ignoredNodes = " + ignoredNodes); + return null; } final String dnAddr = chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); @@ -1102,40 +1109,13 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode, // cached block locations may have been updated by chooseDataNode() // or fetchBlockAt(). Always get the latest list of locations at the // start of the loop. - CachingStrategy curCachingStrategy; - boolean allowShortCircuitLocalReads; LocatedBlock block = getBlockAt(blockStartOffset); - synchronized(infoLock) { - curCachingStrategy = cachingStrategy; - allowShortCircuitLocalReads = !shortCircuitForbidden(); - } - DatanodeInfo chosenNode = datanode.info; - InetSocketAddress targetAddr = datanode.addr; - StorageType storageType = datanode.storageType; BlockReader reader = null; - try { DFSClientFaultInjector.get().fetchFromDatanodeException(); - Token blockToken = block.getBlockToken(); int len = (int) (end - start + 1); - reader = new BlockReaderFactory(dfsClient.getConf()). - setInetSocketAddress(targetAddr). - setRemotePeerFactory(dfsClient). - setDatanodeInfo(chosenNode). - setStorageType(storageType). - setFileName(src). - setBlock(block.getBlock()). - setBlockToken(blockToken). - setStartOffset(start). - setVerifyChecksum(verifyChecksum). - setClientName(dfsClient.clientName). - setLength(len). - setCachingStrategy(curCachingStrategy). - setAllowShortCircuitLocalReads(allowShortCircuitLocalReads). - setClientCacheContext(dfsClient.getClientContext()). - setUserGroupInformation(dfsClient.ugi). - setConfiguration(dfsClient.getConfiguration()). - build(); + reader = getBlockReader(block, start, len, datanode.addr, + datanode.storageType, datanode.info); int nread = reader.readAll(buf, offset, len); updateReadStatistics(readStatistics, nread, reader); @@ -1148,34 +1128,33 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode, } catch (ChecksumException e) { String msg = "fetchBlockByteRange(). Got a checksum exception for " + src + " at " + block.getBlock() + ":" + e.getPos() + " from " - + chosenNode; + + datanode.info; DFSClient.LOG.warn(msg); // we want to remember what we have tried - addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); - addToDeadNodes(chosenNode); + addIntoCorruptedBlockMap(block.getBlock(), datanode.info, + corruptedBlockMap); + addToDeadNodes(datanode.info); throw new IOException(msg); } catch (IOException e) { if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " - + "encryption key was invalid when connecting to " + targetAddr + + "encryption key was invalid when connecting to " + datanode.addr + " : " + e); // The encryption key used is invalid. refetchEncryptionKey--; dfsClient.clearDataEncryptionKey(); - continue; - } else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) { + } else if (refetchToken > 0 && tokenRefetchNeeded(e, datanode.addr)) { refetchToken--; try { fetchBlockAt(block.getStartOffset()); } catch (IOException fbae) { // ignore IOE, since we can retry it later in a loop } - continue; } else { - String msg = "Failed to connect to " + targetAddr + " for file " + String msg = "Failed to connect to " + datanode.addr + " for file " + src + " for block " + block.getBlock() + ":" + e; DFSClient.LOG.warn("Connection failure: " + msg, e); - addToDeadNodes(chosenNode); + addToDeadNodes(datanode.info); throw new IOException(msg); } } finally { @@ -1187,10 +1166,9 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode, } /** - * Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[], - * int, Map)} except we start up a second, parallel, 'hedged' read - * if the first read is taking longer than configured amount of - * time. We then wait on which ever read returns first. + * Like {@link #fetchBlockByteRange} except we start up a second, parallel, + * 'hedged' read if the first read is taking longer than configured amount of + * time. We then wait on which ever read returns first. */ private void hedgedFetchBlockByteRange(long blockStartOffset, long start, long end, byte[] buf, int offset, @@ -1248,9 +1226,8 @@ private void hedgedFetchBlockByteRange(long blockStartOffset, long start, // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // If no nodes to do hedged reads against, pass. try { - try { - chosenNode = getBestNodeDNAddrPair(block, ignored); - } catch (IOException ioe) { + chosenNode = getBestNodeDNAddrPair(block, ignored); + if (chosenNode == null) { chosenNode = chooseDataNode(block, ignored); } bb = ByteBuffer.allocate(len);