From 8808779db351fe444388d4acb3094766b5980718 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Thu, 25 Feb 2016 09:55:50 -0800 Subject: [PATCH] HDFS-9734. Refactoring of checksum failure report related codes. Contributed by Kai Zheng. Change-Id: Ie69a77e3498a360959f8e213c51fb2b17c28b64a --- .../apache/hadoop/hdfs/DFSInputStream.java | 72 +++++++------------ .../hadoop/hdfs/DFSStripedInputStream.java | 46 ++++++------ .../org/apache/hadoop/hdfs/DFSUtilClient.java | 36 ++++++++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/datanode/DataNode.java | 16 ++++- .../erasurecode/ErasureCodingWorker.java | 50 ++++--------- .../hdfs/TestReconstructStripedFile.java | 4 +- 7 files changed, 115 insertions(+), 112 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 3c91ca10b5..d713e8f783 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -26,8 +26,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -57,6 +55,7 @@ import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -857,7 +856,7 @@ public int copyFrom(ByteBuffer src, int offset, int length) { * ChecksumFileSystem */ private synchronized int readBuffer(ReaderStrategy reader, int off, int len, - Map> corruptedBlockMap) + CorruptedBlocks corruptedBlocks) throws IOException { IOException ioe; @@ -880,8 +879,7 @@ private synchronized int readBuffer(ReaderStrategy reader, int off, int len, ioe = ce; retryCurrentNode = false; // we want to remember which block replicas we have tried - addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, - corruptedBlockMap); + corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode); } catch ( IOException e ) { if (!retryCurrentNode) { DFSClient.LOG.warn("Exception while reading from " @@ -914,7 +912,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, if (closed.get()) { throw new IOException("Stream closed"); } - Map> corruptedBlockMap = new HashMap<>(); + + CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); failures = 0; if (pos < getFileLength()) { int retries = 2; @@ -932,7 +931,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, locatedBlocks.getFileLength() - pos); } } - int result = readBuffer(strategy, off, realLen, corruptedBlockMap); + int result = readBuffer(strategy, off, realLen, corruptedBlocks); if (result >= 0) { pos += result; @@ -958,7 +957,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, } finally { // Check if need to report block replicas corruption either read // was successful or ChecksumException occured. - reportCheckSumFailure(corruptedBlockMap, + reportCheckSumFailure(corruptedBlocks, currentLocatedBlock.getLocations().length, false); } } @@ -999,24 +998,6 @@ src, getPos(), reqLen)){ } } - - /** - * Add corrupted block replica into map. - */ - protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, - Map> corruptedBlockMap) { - Set dnSet; - if((corruptedBlockMap.containsKey(blk))) { - dnSet = corruptedBlockMap.get(blk); - }else { - dnSet = new HashSet<>(); - } - if (!dnSet.contains(node)) { - dnSet.add(node); - corruptedBlockMap.put(blk, dnSet); - } - } - private DNAddrPair chooseDataNode(LocatedBlock block, Collection ignoredNodes) throws IOException { while (true) { @@ -1143,15 +1124,14 @@ private static String getBestNodeDNAddrPairErrorString( } protected void fetchBlockByteRange(LocatedBlock block, long start, long end, - byte[] buf, int offset, - Map> corruptedBlockMap) + byte[] buf, int offset, CorruptedBlocks corruptedBlocks) throws IOException { block = refreshLocatedBlock(block); while (true) { DNAddrPair addressPair = chooseDataNode(block, null); try { actualGetFromOneDataNode(addressPair, block, start, end, - buf, offset, corruptedBlockMap); + buf, offset, corruptedBlocks); return; } catch (IOException e) { // Ignore. Already processed inside the function. @@ -1163,7 +1143,7 @@ protected void fetchBlockByteRange(LocatedBlock block, long start, long end, private Callable getFromOneDataNode(final DNAddrPair datanode, final LocatedBlock block, final long start, final long end, final ByteBuffer bb, - final Map> corruptedBlockMap, + final CorruptedBlocks corruptedBlocks, final int hedgedReadId) { final SpanId parentSpanId = Tracer.getCurrentSpanId(); return new Callable() { @@ -1174,7 +1154,7 @@ public ByteBuffer call() throws Exception { try (TraceScope ignored = dfsClient.getTracer(). newScope("hedgedRead" + hedgedReadId, parentSpanId)) { actualGetFromOneDataNode(datanode, block, start, end, buf, - offset, corruptedBlockMap); + offset, corruptedBlocks); return bb; } } @@ -1190,12 +1170,12 @@ public ByteBuffer call() throws Exception { * @param endInBlk the endInBlk offset of the block * @param buf the given byte array into which the data is read * @param offset the offset in buf - * @param corruptedBlockMap map recording list of datanodes with corrupted + * @param corruptedBlocks map recording list of datanodes with corrupted * block replica */ void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block, final long startInBlk, final long endInBlk, byte[] buf, int offset, - Map> corruptedBlockMap) + CorruptedBlocks corruptedBlocks) throws IOException { DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once @@ -1226,8 +1206,7 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block, + datanode.info; DFSClient.LOG.warn(msg); // we want to remember what we have tried - addIntoCorruptedBlockMap(block.getBlock(), datanode.info, - corruptedBlockMap); + corruptedBlocks.addCorruptedBlock(block.getBlock(), datanode.info); addToDeadNodes(datanode.info); throw new IOException(msg); } catch (IOException e) { @@ -1277,8 +1256,7 @@ protected LocatedBlock refreshLocatedBlock(LocatedBlock block) * time. We then wait on which ever read returns first. */ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, - long end, byte[] buf, int offset, - Map> corruptedBlockMap) + long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks) throws IOException { final DfsClientConf conf = dfsClient.getConf(); ArrayList> futures = new ArrayList<>(); @@ -1301,7 +1279,7 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, bb = ByteBuffer.wrap(buf, offset, len); Callable getFromDataNodeCallable = getFromOneDataNode( chosenNode, block, start, end, bb, - corruptedBlockMap, hedgedReadId++); + corruptedBlocks, hedgedReadId++); Future firstRequest = hedgedService .submit(getFromDataNodeCallable); futures.add(firstRequest); @@ -1333,7 +1311,7 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, bb = ByteBuffer.allocate(len); Callable getFromDataNodeCallable = getFromOneDataNode( chosenNode, block, start, end, bb, - corruptedBlockMap, hedgedReadId++); + corruptedBlocks, hedgedReadId++); Future oneMoreRequest = hedgedService .submit(getFromDataNodeCallable); futures.add(oneMoreRequest); @@ -1476,23 +1454,23 @@ private int pread(long position, byte[] buffer, int offset, int length) // corresponding to position and realLen List blockRange = getBlockRange(position, realLen); int remaining = realLen; - Map> corruptedBlockMap = new HashMap<>(); + CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); for (LocatedBlock blk : blockRange) { long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); try { if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) { hedgedFetchBlockByteRange(blk, targetStart, - targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); + targetStart + bytesToRead - 1, buffer, offset, corruptedBlocks); } else { fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, - buffer, offset, corruptedBlockMap); + buffer, offset, corruptedBlocks); } } finally { // Check and report if any block replicas are corrupted. // BlockMissingException may be caught if all block replicas are // corrupted. - reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length, + reportCheckSumFailure(corruptedBlocks, blk.getLocations().length, false); } @@ -1523,12 +1501,14 @@ private int pread(long position, byte[] buffer, int offset, int length) * corresponding to each internal block. For this case we simply report the * corrupted blocks to NameNode and ignore the above logic. * - * @param corruptedBlockMap map of corrupted blocks + * @param corruptedBlocks map of corrupted blocks * @param dataNodeCount number of data nodes who contains the block replicas */ - protected void reportCheckSumFailure( - Map> corruptedBlockMap, + protected void reportCheckSumFailure(CorruptedBlocks corruptedBlocks, int dataNodeCount, boolean isStriped) { + + Map> corruptedBlockMap = + corruptedBlocks.getCorruptionMap(); if (corruptedBlockMap.isEmpty()) { return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 3483255a92..d4174d840f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.ByteBufferPool; @@ -282,8 +283,7 @@ private long getOffsetInBlockGroup(long pos) { * Read a new stripe covering the current position, and store the data in the * {@link #curStripeBuf}. */ - private void readOneStripe( - Map> corruptedBlockMap) + private void readOneStripe(CorruptedBlocks corruptedBlocks) throws IOException { resetCurStripeBuffer(); @@ -307,7 +307,7 @@ private void readOneStripe( for (AlignedStripe stripe : stripes) { // Parse group to get chosen DN location StripeReader sreader = new StatefulStripeReader(readingService, stripe, - blks, blockReaders, corruptedBlockMap); + blks, blockReaders, corruptedBlocks); sreader.readStripe(); } curStripeBuf.position(stripeBufOffset); @@ -319,7 +319,7 @@ private Callable readCells(final BlockReader reader, final DatanodeInfo datanode, final long currentReaderOffset, final long targetReaderOffset, final ByteBufferStrategy[] strategies, final ExtendedBlock currentBlock, - final Map> corruptedBlockMap) { + final CorruptedBlocks corruptedBlocks) { return new Callable() { @Override public Void call() throws Exception { @@ -338,7 +338,7 @@ public Void call() throws Exception { int result = 0; for (ByteBufferStrategy strategy : strategies) { result += readToBuffer(reader, datanode, strategy, currentBlock, - corruptedBlockMap); + corruptedBlocks); } return null; } @@ -348,7 +348,7 @@ public Void call() throws Exception { private int readToBuffer(BlockReader blockReader, DatanodeInfo currentNode, ByteBufferStrategy strategy, ExtendedBlock currentBlock, - Map> corruptedBlockMap) + CorruptedBlocks corruptedBlocks) throws IOException { final int targetLength = strategy.buf.remaining(); int length = 0; @@ -366,8 +366,7 @@ private int readToBuffer(BlockReader blockReader, + currentBlock + " from " + currentNode + " at " + ce.getPos()); // we want to remember which block replicas we have tried - addIntoCorruptedBlockMap(currentBlock, currentNode, - corruptedBlockMap); + corruptedBlocks.addCorruptedBlock(currentBlock, currentNode); throw ce; } catch (IOException e) { DFSClient.LOG.warn("Exception while reading from " @@ -423,8 +422,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy, if (closed.get()) { throw new IOException("Stream closed"); } - Map> corruptedBlockMap = - new ConcurrentHashMap<>(); + + CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); if (pos < getFileLength()) { try { if (pos > blockEnd) { @@ -442,7 +441,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy, int result = 0; while (result < realLen) { if (!curStripeRange.include(getOffsetInBlockGroup())) { - readOneStripe(corruptedBlockMap); + readOneStripe(corruptedBlocks); } int ret = copyToTargetBuf(strategy, off + result, realLen - result); result += ret; @@ -455,7 +454,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy, } finally { // Check if need to report block replicas corruption either read // was successful or ChecksumException occured. - reportCheckSumFailure(corruptedBlockMap, + reportCheckSumFailure(corruptedBlocks, currentLocatedBlock.getLocations().length, true); } } @@ -519,8 +518,7 @@ private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException { */ @Override protected void fetchBlockByteRange(LocatedBlock block, long start, - long end, byte[] buf, int offset, - Map> corruptedBlockMap) + long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks) throws IOException { // Refresh the striped block group LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset()); @@ -536,7 +534,7 @@ protected void fetchBlockByteRange(LocatedBlock block, long start, for (AlignedStripe stripe : stripes) { // Parse group to get chosen DN location StripeReader preader = new PositionStripeReader(readService, stripe, - blks, preaderInfos, corruptedBlockMap); + blks, preaderInfos, corruptedBlocks); preader.readStripe(); } } finally { @@ -575,17 +573,17 @@ private abstract class StripeReader { final AlignedStripe alignedStripe; final CompletionService service; final LocatedBlock[] targetBlocks; - final Map> corruptedBlockMap; + final CorruptedBlocks corruptedBlocks; final BlockReaderInfo[] readerInfos; StripeReader(CompletionService service, AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos, - Map> corruptedBlockMap) { + CorruptedBlocks corruptedBlocks) { this.service = service; this.alignedStripe = alignedStripe; this.targetBlocks = targetBlocks; this.readerInfos = readerInfos; - this.corruptedBlockMap = corruptedBlockMap; + this.corruptedBlocks = corruptedBlocks; } /** prepare all the data chunks */ @@ -731,7 +729,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex) readerInfos[chunkIndex].datanode, readerInfos[chunkIndex].blockReaderOffset, alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), - block.getBlock(), corruptedBlockMap); + block.getBlock(), corruptedBlocks); Future request = service.submit(readCallable); futures.put(request, chunkIndex); @@ -812,10 +810,9 @@ class PositionStripeReader extends StripeReader { PositionStripeReader(CompletionService service, AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, - BlockReaderInfo[] readerInfos, - Map> corruptedBlockMap) { + BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) { super(service, alignedStripe, targetBlocks, readerInfos, - corruptedBlockMap); + corruptedBlocks); } @Override @@ -849,10 +846,9 @@ class StatefulStripeReader extends StripeReader { StatefulStripeReader(CompletionService service, AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, - BlockReaderInfo[] readerInfos, - Map> corruptedBlockMap) { + BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) { super(service, alignedStripe, targetBlocks, readerInfos, - corruptedBlockMap); + corruptedBlocks); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 8f6ed14ca7..d6462522fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -69,9 +70,11 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; @@ -681,4 +684,37 @@ public static InterruptedIOException toInterruptedIOException(String message, iioe.initCause(e); return iioe; } + + /** + * A utility class as a container to put corrupted blocks, shared by client + * and datanode. + */ + public static class CorruptedBlocks { + private Map> corruptionMap; + + public CorruptedBlocks() { + this.corruptionMap = new HashMap<>(); + } + + /** + * Indicate a block replica on the specified datanode is corrupted + */ + public void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node) { + Set dnSet = corruptionMap.get(blk); + if (dnSet == null) { + dnSet = new HashSet<>(); + corruptionMap.put(blk, dnSet); + } + if (!dnSet.contains(node)) { + dnSet.add(node); + } + } + + /** + * @return the map that contains all the corruption entries. + */ + public Map> getCorruptionMap() { + return corruptionMap; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1169e66ad1..e3990eaa31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -443,6 +443,9 @@ Trunk (Unreleased) HDFS-9837. BlockManager#countNodes should be able to detect duplicated internal blocks. (jing9) + HDFS-9734. Refactoring of checksum failure report related codes. + (Kai Zheng via zhz) + BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS HDFS-7347. Configurable erasure coding policy for individual files and diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index aed3eaa5a3..b347129e99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1150,7 +1150,21 @@ public void reportRemoteBadBlock(DatanodeInfo srcDataNode, ExtendedBlock block) BPOfferService bpos = getBPOSForBlock(block); bpos.reportRemoteBadBlock(srcDataNode, block); } - + + public void reportCorruptedBlocks( + DFSUtilClient.CorruptedBlocks corruptedBlocks) throws IOException { + Map> corruptionMap = + corruptedBlocks.getCorruptionMap(); + if (!corruptionMap.isEmpty()) { + for (Map.Entry> entry : + corruptionMap.entrySet()) { + for (DatanodeInfo dnInfo : entry.getValue()) { + reportRemoteBadBlock(dnInfo, entry.getKey()); + } + } + } + } + /** * Try to send an error report to the NNs associated with the given * block pool. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 1017e1eacc..bde8d805b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -32,10 +32,8 @@ import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; @@ -54,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSPacket; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; import org.apache.hadoop.hdfs.RemoteBlockReader2; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -460,13 +459,13 @@ public void run() { bufferSize, maxTargetLength - positionInBlock); // step1: read from minimum source DNs required for reconstruction. // The returned success list is the source DNs we do real read from - Map> corruptionMap = new HashMap<>(); + CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); try { success = readMinimumStripedData4Reconstruction(success, - toReconstruct, corruptionMap); + toReconstruct, corruptedBlocks); } finally { // report corrupted blocks to NN - reportCorruptedBlocks(corruptionMap); + datanode.reportCorruptedBlocks(corruptedBlocks); } // step2: decode to reconstruct targets @@ -564,8 +563,7 @@ private int getReadLength(int index, int reconstructLength) { * @throws IOException */ private int[] readMinimumStripedData4Reconstruction(final int[] success, - int reconstructLength, - Map> corruptionMap) + int reconstructLength, CorruptedBlocks corruptedBlocks) throws IOException { Preconditions.checkArgument(reconstructLength >= 0 && reconstructLength <= bufferSize); @@ -582,7 +580,7 @@ private int[] readMinimumStripedData4Reconstruction(final int[] success, reconstructLength); if (toRead > 0) { Callable readCallable = readFromBlock(reader, reader.buffer, - toRead, corruptionMap); + toRead, corruptedBlocks); Future f = readService.submit(readCallable); futures.put(f, success[i]); } else { @@ -608,11 +606,11 @@ private int[] readMinimumStripedData4Reconstruction(final int[] success, IOUtils.closeStream(failedReader.blockReader); failedReader.blockReader = null; resultIndex = scheduleNewRead(used, reconstructLength, - corruptionMap); + corruptedBlocks); } else if (result.state == StripingChunkReadResult.TIMEOUT) { // If timeout, we also schedule a new read. resultIndex = scheduleNewRead(used, reconstructLength, - corruptionMap); + corruptedBlocks); } if (resultIndex >= 0) { newSuccess[nsuccess++] = resultIndex; @@ -723,7 +721,7 @@ private void reconstructTargets(int[] success, boolean[] targetsStatus, * @return the array index of source DN if don't need to do real read. */ private int scheduleNewRead(BitSet used, int reconstructLen, - Map> corruptionMap) { + CorruptedBlocks corruptedBlocks) { StripedReader reader = null; // step1: initially we may only have minRequiredSources // number of StripedReader, and there may be some source DNs we never @@ -775,7 +773,7 @@ private int scheduleNewRead(BitSet used, int reconstructLen, // step3: schedule if find a correct source DN and need to do real read. if (reader != null) { Callable readCallable = readFromBlock(reader, reader.buffer, - toRead, corruptionMap); + toRead, corruptedBlocks); Future f = readService.submit(readCallable); futures.put(f, m); used.set(m); @@ -793,7 +791,7 @@ private void cancelReads(Collection> futures) { private Callable readFromBlock(final StripedReader reader, final ByteBuffer buf, final int length, - final Map> corruptionMap) { + final CorruptedBlocks corruptedBlocks) { return new Callable() { @Override @@ -805,7 +803,7 @@ public Void call() throws Exception { } catch (ChecksumException e) { LOG.warn("Found Checksum error for {} from {} at {}", reader.block, reader.source, e.getPos()); - addCorruptedBlock(reader.block, reader.source, corruptionMap); + corruptedBlocks.addCorruptedBlock(reader.block, reader.source); throw e; } catch (IOException e) { LOG.info(e.getMessage()); @@ -816,30 +814,6 @@ public Void call() throws Exception { }; } - private void reportCorruptedBlocks( - Map> corruptionMap) throws IOException { - if (!corruptionMap.isEmpty()) { - for (Map.Entry> entry : - corruptionMap.entrySet()) { - for (DatanodeInfo dnInfo : entry.getValue()) { - datanode.reportRemoteBadBlock(dnInfo, entry.getKey()); - } - } - } - } - - private void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node, - Map> corruptionMap) { - Set dnSet = corruptionMap.get(blk); - if (dnSet == null) { - dnSet = new HashSet<>(); - corruptionMap.put(blk, dnSet); - } - if (!dnSet.contains(node)) { - dnSet.add(node); - } - } - /** * Read bytes from block */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index 8241882914..38ca8ceb8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -217,7 +217,7 @@ private int[] generateDeadDnIndices(ReconstructionType type, int deadNum, return d; } - private void shutdownDataNodes(DataNode dn) throws IOException { + private void shutdownDataNode(DataNode dn) throws IOException { /* * Kill the datanode which contains one replica * We need to make sure it dead in namenode: clear its update time and @@ -237,7 +237,7 @@ private int generateErrors(Map corruptTargets, // stop at least one DN to trigger reconstruction LOG.info("Note: stop DataNode " + target.getValue().getDisplayName() + " with internal block " + target.getKey()); - shutdownDataNodes(target.getValue()); + shutdownDataNode(target.getValue()); stoppedDN++; } else { // corrupt the data on the DN LOG.info("Note: corrupt data on " + target.getValue().getDisplayName()