diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ec37542b6e..7b96c56477 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -696,6 +696,9 @@ Release 2.8.0 - UNRELEASED HDFS-8666. Speedup the TestMover tests. (Walter Su via jing9) + HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch + (vinayakumarb) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index f4ceab36ee..4923a5010a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1181,7 +1181,7 @@ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) // Get block info from namenode TraceScope scope = getPathTraceScope("newDFSInputStream", src); try { - return new DFSInputStream(this, src, verifyChecksum); + return new DFSInputStream(this, src, verifyChecksum, null); } finally { scope.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 6563d7b87c..7f3722f2f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ByteBufferReadable; @@ -94,35 +95,35 @@ public class DFSInputStream extends FSInputStream @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; - private final DFSClient dfsClient; - private AtomicBoolean closed = new AtomicBoolean(false); - private final String src; - private final boolean verifyChecksum; + protected final DFSClient dfsClient; + protected AtomicBoolean closed = new AtomicBoolean(false); + protected final String src; + protected final boolean verifyChecksum; // state by stateful read only: // (protected by lock on this) ///// private DatanodeInfo currentNode = null; - private LocatedBlock currentLocatedBlock = null; - private long pos = 0; - private long blockEnd = -1; + protected LocatedBlock currentLocatedBlock = null; + protected long pos = 0; + protected long blockEnd = -1; private BlockReader blockReader = null; //// // state shared by stateful and positional read: // (protected by lock on infoLock) //// - private LocatedBlocks locatedBlocks = null; + protected LocatedBlocks locatedBlocks = null; private long lastBlockBeingWrittenLength = 0; private FileEncryptionInfo fileEncryptionInfo = null; - private CachingStrategy cachingStrategy; + protected CachingStrategy cachingStrategy; //// - private final ReadStatistics readStatistics = new ReadStatistics(); + protected final ReadStatistics readStatistics = new ReadStatistics(); // lock for state shared between read and pread // Note: Never acquire a lock on with this lock held to avoid deadlocks // (it's OK to acquire this lock when the lock on is held) - private final Object infoLock = new Object(); + protected final Object infoLock = new Object(); /** * Track the ByteBuffers that we have handed out to readers. @@ -239,7 +240,7 @@ void clear() { * back to the namenode to get a new list of block locations, and is * capped at maxBlockAcquireFailures */ - private int failures = 0; + protected int failures = 0; /* XXX Use of CocurrentHashMap is temp fix. Need to fix * parallel accesses to DFSInputStream (through ptreads) properly */ @@ -252,24 +253,28 @@ void addToDeadNodes(DatanodeInfo dnInfo) { deadNodes.put(dnInfo, dnInfo); } - DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum - ) throws IOException, UnresolvedLinkException { + DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, + LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException { this.dfsClient = dfsClient; this.verifyChecksum = verifyChecksum; this.src = src; synchronized (infoLock) { this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); } - openInfo(); + this.locatedBlocks = locatedBlocks; + openInfo(false); } /** * Grab the open-file info from namenode + * @param refreshLocatedBlocks whether to re-fetch locatedblocks */ - void openInfo() throws IOException, UnresolvedLinkException { + void openInfo(boolean refreshLocatedBlocks) throws IOException, + UnresolvedLinkException { final DfsClientConf conf = dfsClient.getConf(); synchronized(infoLock) { - lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); + lastBlockBeingWrittenLength = + fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks); int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); while (retriesForLastBlockLength > 0) { // Getting last block length as -1 is a special case. When cluster @@ -281,7 +286,8 @@ void openInfo() throws IOException, UnresolvedLinkException { + "Datanodes might not have reported blocks completely." + " Will retry for " + retriesForLastBlockLength + " times"); waitFor(conf.getRetryIntervalForGetLastBlockLength()); - lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); + lastBlockBeingWrittenLength = + fetchLocatedBlocksAndGetLastBlockLength(true); } else { break; } @@ -302,8 +308,12 @@ private void waitFor(int waitTime) throws IOException { } } - private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { - final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0); + private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) + throws IOException { + LocatedBlocks newInfo = locatedBlocks; + if (locatedBlocks == null || refresh) { + newInfo = dfsClient.getLocatedBlocks(src, 0); + } if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("newInfo = " + newInfo); } @@ -441,7 +451,7 @@ public List getAllBlocks() throws IOException { * @return located block * @throws IOException */ - private LocatedBlock getBlockAt(long offset) throws IOException { + protected LocatedBlock getBlockAt(long offset) throws IOException { synchronized(infoLock) { assert (locatedBlocks != null) : "locatedBlocks is null"; @@ -476,7 +486,7 @@ else if (offset >= locatedBlocks.getFileLength()) { } /** Fetch a block from namenode and cache it */ - private void fetchBlockAt(long offset) throws IOException { + protected void fetchBlockAt(long offset) throws IOException { synchronized(infoLock) { int targetBlockIdx = locatedBlocks.findBlock(offset); if (targetBlockIdx < 0) { // block is not cached @@ -579,7 +589,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { } // Will be getting a new BlockReader. - closeCurrentBlockReader(); + closeCurrentBlockReaders(); // // Connect to best DataNode for desired Block, with potential offset @@ -620,7 +630,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { return chosenNode; } catch (IOException ex) { if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { - DFSClient.LOG.info("Will fetch a new encryption key and retry, " + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr + " : " + ex); // The encryption key used is invalid. @@ -631,8 +641,8 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { fetchBlockAt(target); } else { connectFailedOnce = true; - DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block " - +targetBlock.getBlock()+ ", add to deadNodes and continue. " + ex, ex); + DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" + + ", add to deadNodes and continue. " + ex, ex); // Put chosen node into dead list, continue addToDeadNodes(chosenNode); } @@ -696,7 +706,7 @@ public void accept(ByteBuffer k, Object v) { "unreleased ByteBuffers allocated by read(). " + "Please release " + builder.toString() + "."); } - closeCurrentBlockReader(); + closeCurrentBlockReaders(); super.close(); } @@ -713,12 +723,22 @@ public synchronized int read() throws IOException { * Wraps different possible read implementations so that readBuffer can be * strategy-agnostic. */ - private interface ReaderStrategy { + interface ReaderStrategy { public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException; + + /** + * Copy data from the src ByteBuffer into the read buffer. + * @param src The src buffer where the data is copied from + * @param offset Useful only when the ReadStrategy is based on a byte array. + * Indicate the offset of the byte array for copy. + * @param length Useful only when the ReadStrategy is based on a byte array. + * Indicate the length of the data to copy. + */ + public int copyFrom(ByteBuffer src, int offset, int length); } - private void updateReadStatistics(ReadStatistics readStatistics, + protected void updateReadStatistics(ReadStatistics readStatistics, int nRead, BlockReader blockReader) { if (nRead <= 0) return; synchronized(infoLock) { @@ -749,12 +769,19 @@ public int doRead(BlockReader blockReader, int off, int len) updateReadStatistics(readStatistics, nRead, blockReader); return nRead; } + + @Override + public int copyFrom(ByteBuffer src, int offset, int length) { + ByteBuffer writeSlice = src.duplicate(); + writeSlice.get(buf, offset, length); + return length; + } } /** * Used to read bytes into a user-supplied ByteBuffer */ - private class ByteBufferStrategy implements ReaderStrategy { + protected class ByteBufferStrategy implements ReaderStrategy { final ByteBuffer buf; ByteBufferStrategy(ByteBuffer buf) { this.buf = buf; @@ -770,6 +797,9 @@ public int doRead(BlockReader blockReader, int off, int len) int ret = blockReader.read(buf); success = true; updateReadStatistics(readStatistics, ret, blockReader); + if (ret == 0) { + DFSClient.LOG.warn("zero"); + } return ret; } finally { if (!success) { @@ -779,6 +809,15 @@ public int doRead(BlockReader blockReader, int off, int len) } } } + + @Override + public int copyFrom(ByteBuffer src, int offset, int length) { + ByteBuffer writeSlice = src.duplicate(); + int remaining = Math.min(buf.remaining(), writeSlice.remaining()); + writeSlice.limit(writeSlice.position() + remaining); + buf.put(writeSlice); + return remaining; + } } /* This is a used by regular read() and handles ChecksumExceptions. @@ -837,7 +876,7 @@ private synchronized int readBuffer(ReaderStrategy reader, int off, int len, } } - private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { + protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { dfsClient.checkOpen(); if (closed.get()) { throw new IOException("Stream closed"); @@ -926,7 +965,7 @@ public synchronized int read(final ByteBuffer buf) throws IOException { /** * Add corrupted block replica into map. */ - private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, + protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, Map> corruptedBlockMap) { Set dnSet = null; if((corruptedBlockMap.containsKey(blk))) { @@ -985,8 +1024,8 @@ private DNAddrPair chooseDataNode(LocatedBlock block, } catch (InterruptedException iex) { } deadNodes.clear(); //2nd option is to remove only nodes[blockId] - openInfo(); - block = getBlockAt(block.getStartOffset()); + openInfo(true); + block = refreshLocatedBlock(block); failures++; } } @@ -998,7 +1037,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block, * @param ignoredNodes Do not choose nodes in this array (may be null) * @return The DNAddrPair of the best node. Null if no node can be chosen. */ - private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, + protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, Collection ignoredNodes) { DatanodeInfo[] nodes = block.getLocations(); StorageType[] storageTypes = block.getStorageTypes(); @@ -1058,15 +1097,15 @@ private static String getBestNodeDNAddrPairErrorString( return errMsgr.toString(); } - private void fetchBlockByteRange(long blockStartOffset, long start, long end, + protected void fetchBlockByteRange(LocatedBlock block, long start, long end, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { - LocatedBlock block = getBlockAt(blockStartOffset); + block = refreshLocatedBlock(block); while (true) { DNAddrPair addressPair = chooseDataNode(block, null); try { - actualGetFromOneDataNode(addressPair, blockStartOffset, start, end, + actualGetFromOneDataNode(addressPair, block, start, end, buf, offset, corruptedBlockMap); return; } catch (IOException e) { @@ -1077,7 +1116,7 @@ private void fetchBlockByteRange(long blockStartOffset, long start, long end, } private Callable getFromOneDataNode(final DNAddrPair datanode, - final long blockStartOffset, final long start, final long end, + final LocatedBlock block, final long start, final long end, final ByteBuffer bb, final Map> corruptedBlockMap, final int hedgedReadId) { @@ -1090,7 +1129,7 @@ public ByteBuffer call() throws Exception { TraceScope scope = Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan); try { - actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf, + actualGetFromOneDataNode(datanode, block, start, end, buf, offset, corruptedBlockMap); return bb; } finally { @@ -1100,31 +1139,60 @@ public ByteBuffer call() throws Exception { }; } + /** + * Used when reading contiguous blocks + */ private void actualGetFromOneDataNode(final DNAddrPair datanode, - long blockStartOffset, final long start, final long end, byte[] buf, + LocatedBlock block, final long start, final long end, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { + final int length = (int) (end - start + 1); + actualGetFromOneDataNode(datanode, block, start, end, buf, + new int[]{offset}, new int[]{length}, corruptedBlockMap); + } + + /** + * Read data from one DataNode. + * @param datanode the datanode from which to read data + * @param block the located block containing the requested data + * @param startInBlk the startInBlk offset of the block + * @param endInBlk the endInBlk offset of the block + * @param buf the given byte array into which the data is read + * @param offsets the data may be read into multiple segments of the buf + * (when reading a striped block). this array indicates the + * offset of each buf segment. + * @param lengths the length of each buf segment + * @param corruptedBlockMap map recording list of datanodes with corrupted + * block replica + */ + void actualGetFromOneDataNode(final DNAddrPair datanode, + LocatedBlock block, final long startInBlk, final long endInBlk, + byte[] buf, int[] offsets, int[] lengths, + Map> corruptedBlockMap) + throws IOException { DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once + final int len = (int) (endInBlk - startInBlk + 1); + checkReadPortions(offsets, lengths, len); while (true) { // cached block locations may have been updated by chooseDataNode() // or fetchBlockAt(). Always get the latest list of locations at the // start of the loop. - LocatedBlock block = getBlockAt(blockStartOffset); + block = refreshLocatedBlock(block); BlockReader reader = null; try { DFSClientFaultInjector.get().fetchFromDatanodeException(); - int len = (int) (end - start + 1); - reader = getBlockReader(block, start, len, datanode.addr, + reader = getBlockReader(block, startInBlk, len, datanode.addr, datanode.storageType, datanode.info); - int nread = reader.readAll(buf, offset, len); - updateReadStatistics(readStatistics, nread, reader); - - if (nread != len) { - throw new IOException("truncated return from reader.read(): " + - "excpected " + len + ", got " + nread); + for (int i = 0; i < offsets.length; i++) { + int nread = reader.readAll(buf, offsets[i], lengths[i]); + updateReadStatistics(readStatistics, nread, reader); + if (nread != lengths[i]) { + throw new IOException("truncated return from reader.read(): " + + "excpected " + lengths[i] + ", got " + nread); + } } DFSClientFaultInjector.get().readFromDatanodeDelay(); return; @@ -1169,11 +1237,40 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode, } /** - * Like {@link #fetchBlockByteRange} except we start up a second, parallel, + * Refresh cached block locations. + * @param block The currently cached block locations + * @return Refreshed block locations + * @throws IOException + */ + protected LocatedBlock refreshLocatedBlock(LocatedBlock block) + throws IOException { + return getBlockAt(block.getStartOffset()); + } + + /** + * This method verifies that the read portions are valid and do not overlap + * with each other. + */ + private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) { + Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0); + int sum = 0; + for (int i = 0; i < lengths.length; i++) { + if (i > 0) { + int gap = offsets[i] - offsets[i - 1]; + // make sure read portions do not overlap with each other + Preconditions.checkArgument(gap >= lengths[i - 1]); + } + sum += lengths[i]; + } + Preconditions.checkArgument(sum == totalLen); + } + + /** + * Like {@link #fetchBlockByteRange}except we start up a second, parallel, * 'hedged' read if the first read is taking longer than configured amount of * time. We then wait on which ever read returns first. */ - private void hedgedFetchBlockByteRange(long blockStartOffset, long start, + private void hedgedFetchBlockByteRange(LocatedBlock block, long start, long end, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { @@ -1186,7 +1283,7 @@ private void hedgedFetchBlockByteRange(long blockStartOffset, long start, ByteBuffer bb = null; int len = (int) (end - start + 1); int hedgedReadId = 0; - LocatedBlock block = getBlockAt(blockStartOffset); + block = refreshLocatedBlock(block); while (true) { // see HDFS-6591, this metric is used to verify/catch unnecessary loops hedgedReadOpsLoopNumForTesting++; @@ -1198,7 +1295,7 @@ private void hedgedFetchBlockByteRange(long blockStartOffset, long start, chosenNode = chooseDataNode(block, ignored); bb = ByteBuffer.wrap(buf, offset, len); Callable getFromDataNodeCallable = getFromOneDataNode( - chosenNode, block.getStartOffset(), start, end, bb, + chosenNode, block, start, end, bb, corruptedBlockMap, hedgedReadId++); Future firstRequest = hedgedService .submit(getFromDataNodeCallable); @@ -1235,7 +1332,7 @@ private void hedgedFetchBlockByteRange(long blockStartOffset, long start, } bb = ByteBuffer.allocate(len); Callable getFromDataNodeCallable = getFromOneDataNode( - chosenNode, block.getStartOffset(), start, end, bb, + chosenNode, block, start, end, bb, corruptedBlockMap, hedgedReadId++); Future oneMoreRequest = hedgedService .submit(getFromDataNodeCallable); @@ -1319,7 +1416,7 @@ private void cancelAll(List> futures) { * @return true if block access token has expired or invalid and it should be * refetched */ - private static boolean tokenRefetchNeeded(IOException ex, + protected static boolean tokenRefetchNeeded(IOException ex, InetSocketAddress targetAddr) { /* * Get a new access token and retry. Retry is needed in 2 cases. 1) @@ -1389,13 +1486,11 @@ private int pread(long position, byte[] buffer, int offset, int length) long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); try { if (dfsClient.isHedgedReadsEnabled()) { - hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart, - targetStart + bytesToRead - 1, buffer, offset, - corruptedBlockMap); + hedgedFetchBlockByteRange(blk, targetStart, + targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); } else { - fetchBlockByteRange(blk.getStartOffset(), targetStart, - targetStart + bytesToRead - 1, buffer, offset, - corruptedBlockMap); + fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, + buffer, offset, corruptedBlockMap); } } finally { // Check and report if any block replicas are corrupted. @@ -1427,7 +1522,7 @@ private int pread(long position, byte[] buffer, int offset, int length) * @param corruptedBlockMap map of corrupted blocks * @param dataNodeCount number of data nodes who contains the block replicas */ - private void reportCheckSumFailure( + protected void reportCheckSumFailure( Map> corruptedBlockMap, int dataNodeCount) { if (corruptedBlockMap.isEmpty()) { @@ -1556,7 +1651,7 @@ public synchronized boolean seekToNewSource(long targetPos) throws IOException { /** */ @Override - public synchronized long getPos() throws IOException { + public synchronized long getPos() { return pos; } @@ -1590,7 +1685,7 @@ public void reset() throws IOException { } /** Utility class to encapsulate data node info and its address. */ - private static final class DNAddrPair { + static final class DNAddrPair { final DatanodeInfo info; final InetSocketAddress addr; final StorageType storageType; @@ -1627,7 +1722,7 @@ public FileEncryptionInfo getFileEncryptionInfo() { } } - private void closeCurrentBlockReader() { + protected void closeCurrentBlockReaders() { if (blockReader == null) return; // Close the current block reader so that the new caching settings can // take effect immediately. @@ -1647,7 +1742,7 @@ public synchronized void setReadahead(Long readahead) this.cachingStrategy = new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build(); } - closeCurrentBlockReader(); + closeCurrentBlockReaders(); } @Override @@ -1657,7 +1752,7 @@ public synchronized void setDropBehind(Boolean dropBehind) this.cachingStrategy = new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build(); } - closeCurrentBlockReader(); + closeCurrentBlockReaders(); } /** @@ -1815,6 +1910,6 @@ public synchronized void releaseBuffer(ByteBuffer buffer) { @Override public synchronized void unbuffer() { - closeCurrentBlockReader(); + closeCurrentBlockReaders(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 68cc1552aa..43e0eb921f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -343,7 +343,7 @@ public void testFailuresArePerOperation() throws Exception // we're starting a new operation on the user level. doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires)) .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong()); - is.openInfo(); + is.openInfo(true); // Seek to beginning forces a reopen of the BlockReader - otherwise it'll // just keep reading on the existing stream and the fact that we've poisoned // the block info won't do anything.