diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index ccc3cb0e27..01d8cf032a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -262,3 +262,6 @@ HDFS-8479. Erasure coding: fix striping related logic in FSDirWriteFileOp to sync with HDFS-8421. (Zhe Zhang via jing9) + + HDFS-8481. Erasure coding: remove workarounds in client side stripped blocks + recovering. (zhz) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 3b7eb58814..b4aa033545 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -31,6 +31,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.finalizeDecodeInputs; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks; @@ -41,6 +42,8 @@ import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.net.NetUtils; import org.apache.htrace.Span; import org.apache.htrace.Trace; @@ -117,6 +120,8 @@ boolean include(long pos) { /** the buffer for a complete stripe */ private ByteBuffer curStripeBuf; private final ECSchema schema; + private final RawErasureDecoder decoder; + /** * indicate the start/end offset of the current buffered stripe in the * block group @@ -139,6 +144,7 @@ boolean include(long pos) { curStripeRange = new StripeRange(0, 0); readingService = new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); + decoder = new RSRawDecoder(dataBlkNum, parityBlkNum); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Creating an striped input stream for file " + src); } @@ -591,8 +597,9 @@ private void fetchOneStripe(LocatedStripedBlock blockGroup, } if (alignedStripe.missingChunksNum > 0) { - decodeAndFillBuffer(decodeInputs, buf, alignedStripe, - dataBlkNum, parityBlkNum); + finalizeDecodeInputs(decodeInputs, alignedStripe); + decodeAndFillBuffer(decodeInputs, buf, alignedStripe, parityBlkNum, + decoder); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 8f63236aa4..e75209fcc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -32,6 +32,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import java.util.*; import java.io.IOException; @@ -246,19 +247,36 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes, /** * Initialize the decoding input buffers based on the chunk states in an - * AlignedStripe + * {@link AlignedStripe}. For each chunk that was not initially requested, + * schedule a new fetch request with the decoding input buffer as transfer + * destination. */ public static byte[][] initDecodeInputs(AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum) { byte[][] decodeInputs = new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()]; for (int i = 0; i < alignedStripe.chunks.length; i++) { - StripingChunk chunk = alignedStripe.chunks[i]; - if (chunk == null) { + if (alignedStripe.chunks[i] == null) { alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]); alignedStripe.chunks[i].offsetsInBuf.add(0); alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock()); - } else if (chunk.state == StripingChunk.FETCHED) { + } + } + return decodeInputs; + } + + /** + * Some fetched {@link StripingChunk} might be stored in original application + * buffer instead of prepared decode input buffers. Some others are beyond + * the range of the internal blocks and should correspond to all zero bytes. + * When all pending requests have returned, this method should be called to + * finalize decode input buffers. + */ + public static void finalizeDecodeInputs(final byte[][] decodeInputs, + AlignedStripe alignedStripe) { + for (int i = 0; i < alignedStripe.chunks.length; i++) { + StripingChunk chunk = alignedStripe.chunks[i]; + if (chunk.state == StripingChunk.FETCHED) { int posInBuf = 0; for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j), @@ -267,39 +285,41 @@ public static byte[][] initDecodeInputs(AlignedStripe alignedStripe, } } else if (chunk.state == StripingChunk.ALLZERO) { Arrays.fill(decodeInputs[i], (byte)0); + } else { + decodeInputs[i] = null; } } - return decodeInputs; } - /** - * Decode based on the given input buffers and schema + * Decode based on the given input buffers and schema. */ - public static void decodeAndFillBuffer(final byte[][] decodeInputs, byte[] buf, - AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum) { + public static void decodeAndFillBuffer(final byte[][] decodeInputs, + byte[] buf, AlignedStripe alignedStripe, int parityBlkNum, + RawErasureDecoder decoder) { + // Step 1: prepare indices and output buffers for missing data units int[] decodeIndices = new int[parityBlkNum]; int pos = 0; for (int i = 0; i < alignedStripe.chunks.length; i++) { - if (alignedStripe.chunks[i].state != StripingChunk.FETCHED && - alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { + if (alignedStripe.chunks[i].state == StripingChunk.MISSING){ decodeIndices[pos++] = i; } } + decodeIndices = Arrays.copyOf(decodeIndices, pos); + byte[][] decodeOutputs = + new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()]; - byte[][] outputs = new byte[parityBlkNum][(int) alignedStripe.getSpanInBlock()]; - RSRawDecoder rsRawDecoder = new RSRawDecoder(dataBlkNum, parityBlkNum); - rsRawDecoder.decode(decodeInputs, decodeIndices, outputs); + // Step 2: decode into prepared output buffers + decoder.decode(decodeInputs, decodeIndices, decodeOutputs); - for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { - StripingChunk chunk = alignedStripe.chunks[i]; + // Step 3: fill original application buffer with decoded data + for (int i = 0; i < decodeIndices.length; i++) { + int missingBlkIdx = decodeIndices[i]; + StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; if (chunk.state == StripingChunk.MISSING) { int srcPos = 0; for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { - //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938 -// System.arraycopy(outputs[i], srcPos, buf, chunk.offsetsInBuf.get(j), -// chunk.lengthsInBuf.get(j)); - Arrays.fill(buf, chunk.offsetsInBuf.get(j), - chunk.offsetsInBuf.get(j) + chunk.lengthsInBuf.get(j), (byte)7); + System.arraycopy(decodeOutputs[i], srcPos, buf, chunk.offsetsInBuf.get(j), + chunk.lengthsInBuf.get(j)); srcPos += chunk.lengthsInBuf.get(j); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index a71441f021..ce563254ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -221,13 +221,13 @@ public void testPreadWithDNFailure() throws Exception { decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte( new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk); } -// RSRawDecoder rsRawDecoder = new RSRawDecoder(); -// rsRawDecoder.initialize(DATA_BLK_NUM, PARITY_BLK_NUM, CELLSIZE); -// rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs); + for (int m : missingBlkIdx) { + decodeInputs[m] = null; + } + RSRawDecoder rsRawDecoder = new RSRawDecoder(DATA_BLK_NUM, PARITY_BLK_NUM); + rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs); int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE; -// System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE); - //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938 - Arrays.fill(expected, posInBuf, posInBuf + CELLSIZE, (byte)7); + System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE); } int delta = 10; int done = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index 70802fb59a..b0436a68c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -382,15 +382,9 @@ public void testWritePreadWithDNFailure() throws IOException { Assert.assertEquals("The length of file should be the same to write size", length - startOffsetInFile, readLen); - RSRawDecoder rsRawDecoder = new RSRawDecoder(dataBlocks, parityBlocks); byte[] expected = new byte[readLen]; for (int i = startOffsetInFile; i < length; i++) { - //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938 - if ((i / cellSize) % dataBlocks == failedDNIdx) { - expected[i - startOffsetInFile] = (byte)7; - } else { - expected[i - startOffsetInFile] = getByte(i); - } + expected[i - startOffsetInFile] = getByte(i); } for (int i = startOffsetInFile; i < length; i++) { Assert.assertEquals("Byte at " + i + " should be the same",