From a31eada33a598ebf9f78e48a3ab1ed031b9bbd27 Mon Sep 17 00:00:00 2001 From: yliu Date: Thu, 4 Jun 2015 14:41:38 +0800 Subject: [PATCH] HDFS-8328. Follow-on to update decode for DataNode striped blocks reconstruction. (yliu) --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +- .../erasurecode/ErasureCodingWorker.java | 427 ++++++++++-------- .../hadoop/hdfs/TestRecoverStripedFile.java | 65 ++- 4 files changed, 308 insertions(+), 189 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 511ebecc20..a160520fa1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -280,3 +280,6 @@ HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic. (Walter Su via zhz) + + HDFS-8328. Follow-on to update decode for DataNode striped blocks + reconstruction. (yliu) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8e6b9f0b93..77dd1fdbe8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -373,7 +373,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads"; public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20; public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size"; - public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 256 * 1024; + public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024; public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis"; public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 00cf0fd5e2..6f3857feee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -70,9 +70,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; -import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; @@ -80,6 +78,8 @@ import com.google.common.base.Preconditions; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode; + /** * ErasureCodingWorker handles the erasure coding recovery work commands. These * commands would be issued from Namenode as part of Datanode's heart beat @@ -110,10 +110,6 @@ public ErasureCodingWorker(Configuration conf, DataNode datanode) { DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT); } - - private RawErasureEncoder newEncoder(int numDataUnits, int numParityUnits) { - return new RSRawEncoder(numDataUnits, numParityUnits); - } private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { return new RSRawDecoder(numDataUnits, numParityUnits); @@ -221,14 +217,14 @@ private class ReconstructAndTransferBlock implements Runnable { private final int parityBlkNum; private final int cellSize; - private RawErasureEncoder encoder; private RawErasureDecoder decoder; // Striped read buffer size private int bufferSize; private final ExtendedBlock blockGroup; - // position in striped block + private final int minRequiredSources; + // position in striped internal block private long positionInBlock; // sources @@ -237,6 +233,10 @@ private class ReconstructAndTransferBlock implements Runnable { private final List stripedReaders; + // The buffers and indices for striped blocks whose length is 0 + private ByteBuffer[] zeroStripeBuffers; + private short[] zeroStripeIndices; + // targets private final DatanodeInfo[] targets; private final StorageType[] targetStorageTypes; @@ -272,21 +272,32 @@ private class ReconstructAndTransferBlock implements Runnable { cellSize = recoveryInfo.getCellSize(); blockGroup = recoveryInfo.getExtendedBlock(); + final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1); + minRequiredSources = Math.min(cellsNum, dataBlkNum); liveIndices = recoveryInfo.getLiveBlockIndices(); sources = recoveryInfo.getSourceDnInfos(); stripedReaders = new ArrayList<>(sources.length); - Preconditions.checkArgument(liveIndices.length >= dataBlkNum, + Preconditions.checkArgument(liveIndices.length >= minRequiredSources, "No enough live striped blocks."); Preconditions.checkArgument(liveIndices.length == sources.length, "liveBlockIndices and source dns should match"); + if (minRequiredSources < dataBlkNum) { + zeroStripeBuffers = + new ByteBuffer[dataBlkNum - minRequiredSources]; + zeroStripeIndices = new short[dataBlkNum - minRequiredSources]; + } + targets = recoveryInfo.getTargetDnInfos(); targetStorageTypes = recoveryInfo.getTargetStorageTypes(); targetIndices = new short[targets.length]; targetBuffers = new ByteBuffer[targets.length]; + Preconditions.checkArgument(targetIndices.length <= parityBlkNum, + "Too much missed striped blocks."); + targetSockets = new Socket[targets.length]; targetOutputStreams = new DataOutputStream[targets.length]; targetInputStreams = new DataInputStream[targets.length]; @@ -303,6 +314,10 @@ private class ReconstructAndTransferBlock implements Runnable { cachingStrategy = CachingStrategy.newDefaultStrategy(); } + private ByteBuffer allocateBuffer(int length) { + return ByteBuffer.allocate(length); + } + private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) { return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize, dataBlkNum, i); @@ -313,37 +328,67 @@ private long getBlockLen(ExtendedBlock blockGroup, int i) { cellSize, dataBlkNum, i); } + /** + * StripedReader is used to read from one source DN, it contains a block + * reader, buffer and striped block index. + * Only allocate StripedReader once for one source, and the StripedReader + * has the same array order with sources. Typically we only need to allocate + * minimum number (minRequiredSources) of StripedReader, and allocate + * new for new source DN if some existing DN invalid or slow. + * If some source DN is corrupt, set the corresponding blockReader to + * null and will never read from it again. + * + * @param i the array index of sources + * @param offsetInBlock offset for the internal block + * @return StripedReader + */ + private StripedReader addStripedReader(int i, long offsetInBlock) { + StripedReader reader = new StripedReader(liveIndices[i]); + stripedReaders.add(reader); + + BlockReader blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]); + if (blockReader != null) { + initChecksumAndBufferSizeIfNeeded(blockReader); + reader.blockReader = blockReader; + } + reader.buffer = allocateBuffer(bufferSize); + return reader; + } + @Override public void run() { datanode.incrementXmitsInProgress(); try { - // Store the indices of successfully read source - // This will be updated after doing real read. - int[] success = new int[dataBlkNum]; + // Store the array indices of source DNs we have read successfully. + // In each iteration of read, the success list may be updated if + // some source DN is corrupted or slow. And use the updated success + // list of DNs for next iteration read. + int[] success = new int[minRequiredSources]; int nsuccess = 0; - for (int i = 0; i < sources.length && nsuccess < dataBlkNum; i++) { - StripedReader reader = new StripedReader(liveIndices[i]); - stripedReaders.add(reader); - - BlockReader blockReader = newBlockReader( - getBlock(blockGroup, liveIndices[i]), 0, sources[i]); - if (blockReader != null) { - initChecksumAndBufferSizeIfNeeded(blockReader); - reader.blockReader = blockReader; - reader.buffer = ByteBuffer.allocate(bufferSize); + for (int i = 0; + i < sources.length && nsuccess < minRequiredSources; i++) { + StripedReader reader = addStripedReader(i, 0); + if (reader.blockReader != null) { success[nsuccess++] = i; } } - if (nsuccess < dataBlkNum) { + if (nsuccess < minRequiredSources) { String error = "Can't find minimum sources required by " + "recovery, block id: " + blockGroup.getBlockId(); throw new IOException(error); } + if (zeroStripeBuffers != null) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + zeroStripeBuffers[i] = allocateBuffer(bufferSize); + } + } + for (int i = 0; i < targets.length; i++) { - targetBuffers[i] = ByteBuffer.allocate(bufferSize); + targetBuffers[i] = allocateBuffer(bufferSize); } checksumSize = checksum.getChecksumSize(); @@ -356,7 +401,9 @@ public void run() { packetBuf = new byte[maxPacketSize]; checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)]; - // Store whether the target is success + // targetsStatus store whether some target is success, it will record + // any failed target once, if some target failed (invalid DN or transfer + // failed), will not transfer data to it any more. boolean[] targetsStatus = new boolean[targets.length]; if (initTargetStreams(targetsStatus) == 0) { String error = "All targets are failed."; @@ -367,16 +414,11 @@ public void run() { while (positionInBlock < firstStripedBlockLength) { int toRead = Math.min( bufferSize, (int)(firstStripedBlockLength - positionInBlock)); - // step1: read minimum striped buffer size data required by recovery. - nsuccess = readMinimumStripedData4Recovery(success); + // step1: read from minimum source DNs required for reconstruction. + // The returned success list is the source DNs we do real read from + success = readMinimumStripedData4Recovery(success); - if (nsuccess < dataBlkNum) { - String error = "Can't read data from minimum number of sources " - + "required by recovery, block id: " + blockGroup.getBlockId(); - throw new IOException(error); - } - - // step2: encode/decode to recover targets + // step2: decode to reconstruct targets long remaining = firstStripedBlockLength - positionInBlock; int toRecoverLen = remaining < bufferSize ? (int)remaining : bufferSize; @@ -426,65 +468,97 @@ private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) { } } - // assume liveIndices is not ordered. private void getTargetIndices() { BitSet bitset = new BitSet(dataBlkNum + parityBlkNum); for (int i = 0; i < sources.length; i++) { bitset.set(liveIndices[i]); } int m = 0; - for (int i = 0; i < dataBlkNum + parityBlkNum && m < targets.length; i++) { + int k = 0; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { if (!bitset.get(i)) { - targetIndices[m++] = (short)i; + if (getBlockLen(blockGroup, i) > 0) { + if (m < targets.length) { + targetIndices[m++] = (short)i; + } + } else { + zeroStripeIndices[k++] = (short)i; + } } } } - /** - * Read minimum striped buffer size data required by recovery. - * success list will be updated after read. - * - * Initially we only read from dataBlkNum sources, - * if timeout or failure for some source, we will try to schedule - * read from a new source. - */ - private int readMinimumStripedData4Recovery(int[] success) { + private long getReadLength(int index) { + long blockLen = getBlockLen(blockGroup, index); + long remaining = blockLen - positionInBlock; + return remaining > bufferSize ? bufferSize : remaining; + } + /** + * Read from minimum source DNs required for reconstruction in the iteration. + * First try the success list which we think they are the best DNs + * If source DN is corrupt or slow, try to read some other source DN, + * and will update the success list. + * + * Remember the updated success list and return it for following + * operations and next iteration read. + * + * @param success the initial success list of source DNs we think best + * @return updated success list of source DNs we do real read + * @throws IOException + */ + private int[] readMinimumStripedData4Recovery(final int[] success) + throws IOException { + int nsuccess = 0; + int[] newSuccess = new int[minRequiredSources]; BitSet used = new BitSet(sources.length); - for (int i = 0; i < dataBlkNum; i++) { + /* + * Read from minimum source DNs required, the success list contains + * source DNs which we think best. + */ + for (int i = 0; i < minRequiredSources; i++) { StripedReader reader = stripedReaders.get(success[i]); - Callable readCallable = readFromBlock( - reader.blockReader, reader.buffer); - Future f = readService.submit(readCallable); - futures.put(f, success[i]); + if (getReadLength(liveIndices[success[i]]) > 0) { + Callable readCallable = readFromBlock( + reader.blockReader, reader.buffer); + Future f = readService.submit(readCallable); + futures.put(f, success[i]); + } else { + // If the read length is 0, we don't need to do real read + reader.buffer.position(0); + newSuccess[nsuccess++] = success[i]; + } used.set(success[i]); } - int nsuccess = 0; while (!futures.isEmpty()) { try { StripingChunkReadResult result = StripedBlockUtil.getNextCompletedStripedRead( readService, futures, STRIPED_READ_THRESHOLD_MILLIS); + int resultIndex = -1; if (result.state == StripingChunkReadResult.SUCCESSFUL) { - success[nsuccess++] = result.index; - if (nsuccess >= dataBlkNum) { + resultIndex = result.index; + } else if (result.state == StripingChunkReadResult.FAILED) { + // If read failed for some source DN, we should not use it anymore + // and schedule read from another source DN. + StripedReader failedReader = stripedReaders.get(result.index); + closeBlockReader(failedReader.blockReader); + failedReader.blockReader = null; + resultIndex = scheduleNewRead(used); + } else if (result.state == StripingChunkReadResult.TIMEOUT) { + // If timeout, we also schedule a new read. + resultIndex = scheduleNewRead(used); + } + if (resultIndex >= 0) { + newSuccess[nsuccess++] = resultIndex; + if (nsuccess >= minRequiredSources) { // cancel remaining reads if we read successfully from minimum - // number of sources required for recovery. + // number of source DNs required by reconstruction. cancelReads(futures.keySet()); futures.clear(); break; } - } else if (result.state == StripingChunkReadResult.FAILED) { - // If read failed for some source, we should not use it anymore - // and schedule read from a new source. - StripedReader failedReader = stripedReaders.get(result.index); - closeBlockReader(failedReader.blockReader); - failedReader.blockReader = null; - scheduleNewRead(used); - } else if (result.state == StripingChunkReadResult.TIMEOUT) { - // If timeout, we also schedule a new read. - scheduleNewRead(used); } } catch (InterruptedException e) { LOG.info("Read data interrupted.", e); @@ -492,19 +566,13 @@ private int readMinimumStripedData4Recovery(int[] success) { } } - return nsuccess; - } - - /** - * Return true if need to do encoding to recovery missed striped block. - */ - private boolean shouldEncode(int[] success) { - for (int i = 0; i < success.length; i++) { - if (stripedReaders.get(success[i]).index >= dataBlkNum) { - return false; - } + if (nsuccess < minRequiredSources) { + String error = "Can't read data from minimum number of sources " + + "required by reconstruction, block id: " + blockGroup.getBlockId(); + throw new IOException(error); } - return true; + + return newSuccess; } private void paddingBufferToLen(ByteBuffer buffer, int len) { @@ -514,13 +582,6 @@ private void paddingBufferToLen(ByteBuffer buffer, int len) { } } - // Initialize encoder - private void initEncoderIfNecessary() { - if (encoder == null) { - encoder = newEncoder(dataBlkNum, parityBlkNum); - } - } - // Initialize decoder private void initDecoderIfNecessary() { if (decoder == null) { @@ -528,119 +589,119 @@ private void initDecoderIfNecessary() { } } + private int[] getErasedIndices(boolean[] targetsStatus) { + int[] result = new int[targets.length]; + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + result[m++] = convertIndex4Decode(targetIndices[i], + dataBlkNum, parityBlkNum); + } + } + return Arrays.copyOf(result, m); + } + private void recoverTargets(int[] success, boolean[] targetsStatus, int toRecoverLen) { - if (shouldEncode(success)) { - initEncoderIfNecessary(); - ByteBuffer[] dataBuffers = new ByteBuffer[dataBlkNum]; - ByteBuffer[] parityBuffers = new ByteBuffer[parityBlkNum]; - for (int i = 0; i < dataBlkNum; i++) { - StripedReader reader = stripedReaders.get(i); - ByteBuffer buffer = reader.buffer; + initDecoderIfNecessary(); + ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; + for (int i = 0; i < success.length; i++) { + StripedReader reader = stripedReaders.get(success[i]); + ByteBuffer buffer = reader.buffer; + paddingBufferToLen(buffer, toRecoverLen); + inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] = + (ByteBuffer)buffer.flip(); + } + if (success.length < dataBlkNum) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + ByteBuffer buffer = zeroStripeBuffers[i]; paddingBufferToLen(buffer, toRecoverLen); - dataBuffers[i] = (ByteBuffer)buffer.flip(); - } - for (int i = dataBlkNum; i < stripedReaders.size(); i++) { - StripedReader reader = stripedReaders.get(i); - parityBuffers[reader.index - dataBlkNum] = cleanBuffer(reader.buffer); - } - for (int i = 0; i < targets.length; i++) { - parityBuffers[targetIndices[i] - dataBlkNum] = targetBuffers[i]; - } - for (int i = 0; i < parityBlkNum; i++) { - if (parityBuffers[i] == null) { - parityBuffers[i] = ByteBuffer.allocate(toRecoverLen); - } else { - parityBuffers[i].limit(toRecoverLen); - } - } - encoder.encode(dataBuffers, parityBuffers); - } else { - /////////// TODO: wait for HADOOP-11847 ///////////// - ////////// The current decode method always try to decode parityBlkNum number of data blocks. //////////// - initDecoderIfNecessary(); - ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; - for (int i = 0; i < success.length; i++) { - StripedReader reader = stripedReaders.get(success[i]); - ByteBuffer buffer = reader.buffer; - paddingBufferToLen(buffer, toRecoverLen); - int index = reader.index < dataBlkNum ? - reader.index + parityBlkNum : reader.index - dataBlkNum; + int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum, + parityBlkNum); inputs[index] = (ByteBuffer)buffer.flip(); } - int[] indices4Decode = new int[parityBlkNum]; - int m = 0; - for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { - if (inputs[i] == null) { - inputs[i] = ByteBuffer.allocate(toRecoverLen); - indices4Decode[m++] = i; - } + } + int[] erasedIndices = getErasedIndices(targetsStatus); + ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length]; + int m = 0; + for (int i = 0; i < targetBuffers.length; i++) { + if (targetsStatus[i]) { + outputs[m++] = targetBuffers[i]; + outputs[i].limit(toRecoverLen); } - ByteBuffer[] outputs = new ByteBuffer[parityBlkNum]; - m = 0; - // targetIndices is subset of indices4Decode - for (int i = 0; i < parityBlkNum; i++) { - if (m < targetIndices.length && - (indices4Decode[i] - parityBlkNum) == targetIndices[m]) { - outputs[i] = targetBuffers[m++]; - outputs[i].limit(toRecoverLen); - } else { - outputs[i] = ByteBuffer.allocate(toRecoverLen); - } - } - - decoder.decode(inputs, indices4Decode, outputs); - - for (int i = 0; i < targets.length; i++) { - if (targetsStatus[i]) { - long blockLen = getBlockLen(blockGroup, targetIndices[i]); - long remaining = blockLen - positionInBlock; - if (remaining < 0) { - targetBuffers[i].limit(0); - } else if (remaining < toRecoverLen) { - targetBuffers[i].limit((int)remaining); - } + } + decoder.decode(inputs, erasedIndices, outputs); + + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + long blockLen = getBlockLen(blockGroup, targetIndices[i]); + long remaining = blockLen - positionInBlock; + if (remaining < 0) { + targetBuffers[i].limit(0); + } else if (remaining < toRecoverLen) { + targetBuffers[i].limit((int)remaining); } } } } - /** - * Schedule read from a new source, we first try un-initial source, - * then try un-used source in this round and bypass failed source. + /** + * Schedule a read from some new source DN if some DN is corrupted + * or slow, this is called from the read iteration. + * Initially we may only have minRequiredSources number of + * StripedReader. + * If the position is at the end of target block, don't need to do + * real read, and return the array index of source DN, otherwise -1. + * + * @param used the used source DNs in this iteration. + * @return the array index of source DN if don't need to do real read. */ - private void scheduleNewRead(BitSet used) { + private int scheduleNewRead(BitSet used) { StripedReader reader = null; + // step1: initially we may only have minRequiredSources + // number of StripedReader, and there may be some source DNs we never + // read before, so will try to create StripedReader for one new source DN + // and try to read from it. If found, go to step 3. int m = stripedReaders.size(); - while (m < sources.length && reader == null) { - reader = new StripedReader(liveIndices[m]); - BlockReader blockReader = newBlockReader( - getBlock(blockGroup, liveIndices[m]), positionInBlock, sources[m]); - stripedReaders.add(reader); - if (blockReader != null) { - assert blockReader.getDataChecksum().equals(checksum); - reader.blockReader = blockReader; - reader.buffer = ByteBuffer.allocate(bufferSize); + while (reader == null && m < sources.length) { + reader = addStripedReader(m, positionInBlock); + if (getReadLength(liveIndices[m]) > 0) { + if (reader.blockReader == null) { + reader = null; + m++; + } } else { - m++; - reader = null; + used.set(m); + return m; } } + // step2: if there is no new source DN we can use, try to find a source + // DN we ever read from but because some reason, e.g., slow, it + // is not in the success DN list at the begin of this iteration, so + // we have not tried it in this iteration. Now we have a chance to + // revisit it again. for (int i = 0; reader == null && i < stripedReaders.size(); i++) { - StripedReader r = stripedReaders.get(i); - if (r.blockReader != null && !used.get(i)) { - closeBlockReader(r.blockReader); - r.blockReader = newBlockReader( - getBlock(blockGroup, liveIndices[i]), positionInBlock, - sources[i]); - if (r.blockReader != null) { - m = i; - reader = r; + if (!used.get(i)) { + StripedReader r = stripedReaders.get(i); + if (getReadLength(liveIndices[i]) > 0) { + closeBlockReader(r.blockReader); + r.blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), positionInBlock, + sources[i]); + if (r.blockReader != null) { + m = i; + reader = r; + } + } else { + used.set(i); + r.buffer.position(0); + return i; } } } + // step3: schedule if find a correct source DN and need to do real read. if (reader != null) { Callable readCallable = readFromBlock( reader.blockReader, reader.buffer); @@ -648,6 +709,8 @@ private void scheduleNewRead(BitSet used) { futures.put(f, m); used.set(m); } + + return -1; } // cancel all reads. @@ -708,7 +771,10 @@ private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { } private BlockReader newBlockReader(final ExtendedBlock block, - long startOffset, DatanodeInfo dnInfo) { + long offsetInBlock, DatanodeInfo dnInfo) { + if (offsetInBlock >= block.getNumBytes()) { + return null; + } try { InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo); Token blockToken = datanode.getBlockAccessToken( @@ -720,7 +786,8 @@ private BlockReader newBlockReader(final ExtendedBlock block, * requires config for domain-socket in UNIX or legacy config in Windows. */ return RemoteBlockReader2.newBlockReader( - "dummy", block, blockToken, startOffset, block.getNumBytes(), true, + "dummy", block, blockToken, offsetInBlock, + block.getNumBytes() - offsetInBlock, true, "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo, null, cachingStrategy); } catch (IOException e) { @@ -808,6 +875,12 @@ private void clearBuffers() { } } + if (zeroStripeBuffers != null) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + zeroStripeBuffers[i].clear(); + } + } + for (int i = 0; i < targetBuffers.length; i++) { if (targetBuffers[i] != null) { cleanBuffer(targetBuffers[i]); @@ -903,7 +976,7 @@ private int initTargetStreams(boolean[] targetsStatus) { } private static class StripedReader { - private final short index; + private final short index; // internal block index private BlockReader blockReader; private ByteBuffer buffer; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java index 9e44761d20..9285fd73fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java @@ -100,29 +100,69 @@ public void testRecoverOneParityBlock() throws Exception { } @Test(timeout = 120000) - public void testRecoverThreeParityBlocks() throws Exception { + public void testRecoverOneParityBlock1() throws Exception { + int fileLen = cellSize + cellSize/10; + assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, 0, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock2() throws Exception { + int fileLen = 1; + assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, 0, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock3() throws Exception { int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, 0, 1); + } + + @Test(timeout = 120000) + public void testRecoverThreeParityBlocks() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3); } @Test(timeout = 120000) public void testRecoverThreeDataBlocks() throws Exception { - int fileLen = 3 * blockSize + blockSize/10; + int fileLen = 10 * blockSize + blockSize/10; assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3); } + @Test(timeout = 120000) + public void testRecoverThreeDataBlocks1() throws Exception { + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, 1, 3); + } + @Test(timeout = 120000) public void testRecoverOneDataBlock() throws Exception { - ////TODO: TODO: wait for HADOOP-11847 - //int fileLen = 10 * blockSize + blockSize/10; - //assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1); + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneDataBlock1() throws Exception { + int fileLen = cellSize + cellSize/10; + assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, 1, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneDataBlock2() throws Exception { + int fileLen = 1; + assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, 1, 1); } @Test(timeout = 120000) public void testRecoverAnyBlocks() throws Exception { - ////TODO: TODO: wait for HADOOP-11847 - //int fileLen = 3 * blockSize + blockSize/10; - //assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2); + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2); + } + + @Test(timeout = 120000) + public void testRecoverAnyBlocks1() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, 2, 3); } /** @@ -203,6 +243,9 @@ private void assertFileBlocksRecovery(String fileName, int fileLen, replicaContents[i] = readReplica(replicas[i]); } + int cellsNum = (fileLen - 1) / cellSize + 1; + int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum; + try { DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum]; for (int i = 0; i < toRecoverBlockNum; i++) { @@ -216,7 +259,6 @@ private void assertFileBlocksRecovery(String fileName, int fileLen, dnIDs[i] = dn.getDatanodeId(); } setDataNodesDead(dnIDs); - // Check the locatedBlocks of the file again locatedBlocks = getLocatedBlocks(file); @@ -232,7 +274,7 @@ private void assertFileBlocksRecovery(String fileName, int fileLen, } } - waitForRecoveryFinished(file); + waitForRecoveryFinished(file, groupSize); targetDNs = sortTargetsByReplicas(blocks, targetDNs); @@ -319,7 +361,8 @@ private byte[] readReplica(File replica) throws IOException { } } - private LocatedBlocks waitForRecoveryFinished(Path file) throws Exception { + private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize) + throws Exception { final int ATTEMPTS = 60; for (int i = 0; i < ATTEMPTS; i++) { LocatedBlocks locatedBlocks = getLocatedBlocks(file);