From c201cf951d5adefefe7c68e882a0c07962248577 Mon Sep 17 00:00:00 2001 From: yliu Date: Wed, 28 Oct 2015 16:18:23 +0800 Subject: [PATCH] HADOOP-12040. Adjust inputs order for the decode API in raw erasure coder. (Kai Zheng via yliu) --- .../hadoop-common/CHANGES.txt | 3 + .../coder/AbstractErasureDecoder.java | 23 ++++--- .../rawcoder/AbstractRawErasureCoder.java | 6 ++ .../io/erasurecode/rawcoder/RSRawDecoder.java | 69 ++++++++++++++++++- .../rawcoder/RawErasureDecoder.java | 8 +-- .../hadoop/io/erasurecode/TestCoderBase.java | 44 ++++++------ .../coder/TestErasureCoderBase.java | 8 +-- .../erasurecode/coder/TestRSErasureCoder.java | 7 +- .../hadoop/hdfs/DFSStripedInputStream.java | 41 ++++------- .../hadoop/hdfs/util/StripedBlockUtil.java | 37 ++-------- .../erasurecode/ErasureCodingWorker.java | 11 +-- .../hdfs/TestDFSStripedInputStream.java | 13 ++-- 12 files changed, 154 insertions(+), 116 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 25a3a601e3..4b9a707783 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -922,6 +922,9 @@ Release 2.8.0 - UNRELEASED HADOOP-11685. StorageException complaining " no lease ID" during HBase distributed log splitting (Duo Xu via cnauroth) + HADOOP-12040. Adjust inputs order for the decode API in raw erasure coder. + (Kai Zheng via yliu) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureDecoder.java index 6fdae9369e..abada3d585 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureDecoder.java @@ -59,13 +59,14 @@ public abstract class AbstractErasureDecoder extends AbstractErasureCoder { * @return */ protected ECBlock[] getInputBlocks(ECBlockGroup blockGroup) { - ECBlock[] inputBlocks = new ECBlock[getNumParityUnits() - + getNumDataUnits()]; + ECBlock[] inputBlocks = new ECBlock[getNumDataUnits() + + getNumParityUnits()]; - System.arraycopy(blockGroup.getParityBlocks(), 0, inputBlocks, 0, - getNumParityUnits()); System.arraycopy(blockGroup.getDataBlocks(), 0, inputBlocks, - getNumParityUnits(), getNumDataUnits()); + 0, getNumDataUnits()); + + System.arraycopy(blockGroup.getParityBlocks(), 0, inputBlocks, + getNumDataUnits(), getNumParityUnits()); return inputBlocks; } @@ -80,18 +81,18 @@ public abstract class AbstractErasureDecoder extends AbstractErasureCoder { int idx = 0; - for (int i = 0; i < getNumParityUnits(); i++) { - if (blockGroup.getParityBlocks()[i].isErased()) { - outputBlocks[idx++] = blockGroup.getParityBlocks()[i]; - } - } - for (int i = 0; i < getNumDataUnits(); i++) { if (blockGroup.getDataBlocks()[i].isErased()) { outputBlocks[idx++] = blockGroup.getDataBlocks()[i]; } } + for (int i = 0; i < getNumParityUnits(); i++) { + if (blockGroup.getParityBlocks()[i].isErased()) { + outputBlocks[idx++] = blockGroup.getParityBlocks()[i]; + } + } + return outputBlocks; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java index 0ace085279..35e94925b0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java @@ -34,10 +34,12 @@ public abstract class AbstractRawErasureCoder private final int numDataUnits; private final int numParityUnits; + private final int numAllUnits; public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) { this.numDataUnits = numDataUnits; this.numParityUnits = numParityUnits; + this.numAllUnits = numDataUnits + numParityUnits; } @Override @@ -50,6 +52,10 @@ public abstract class AbstractRawErasureCoder return numParityUnits; } + protected int getNumAllUnits() { + return numAllUnits; + } + @Override public boolean preferDirectBuffer() { return false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java index 69ce3e8748..1acaab916b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java @@ -72,6 +72,35 @@ public class RSRawDecoder extends AbstractRawErasureDecoder { numParityUnits); } + @Override + public void decode(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs) { + // Make copies avoiding affecting original ones; + ByteBuffer[] newInputs = new ByteBuffer[inputs.length]; + int[] newErasedIndexes = new int[erasedIndexes.length]; + ByteBuffer[] newOutputs = new ByteBuffer[outputs.length]; + + // Adjust the order to match with underlying requirements. + adjustOrder(inputs, newInputs, + erasedIndexes, newErasedIndexes, outputs, newOutputs); + + super.decode(newInputs, newErasedIndexes, newOutputs); + } + + @Override + public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) { + // Make copies avoiding affecting original ones; + byte[][] newInputs = new byte[inputs.length][]; + int[] newErasedIndexes = new int[erasedIndexes.length]; + byte[][] newOutputs = new byte[outputs.length][]; + + // Adjust the order to match with underlying requirements. + adjustOrder(inputs, newInputs, + erasedIndexes, newErasedIndexes, outputs, newOutputs); + + super.decode(newInputs, newErasedIndexes, newOutputs); + } + private void doDecodeImpl(ByteBuffer[] inputs, int[] erasedIndexes, ByteBuffer[] outputs) { ByteBuffer valid = findFirstValidInput(inputs); @@ -95,7 +124,7 @@ public class RSRawDecoder extends AbstractRawErasureDecoder { } RSUtil.GF.solveVandermondeSystem(errSignature, outputs, outputOffsets, - erasedIndexes.length, dataLen); + erasedIndexes.length, dataLen); } @Override @@ -146,7 +175,7 @@ public class RSRawDecoder extends AbstractRawErasureDecoder { } doDecodeImpl(inputs, inputOffsets, dataLen, erasedOrNotToReadIndexes, - adjustedByteArrayOutputsParameter, adjustedOutputOffsets); + adjustedByteArrayOutputsParameter, adjustedOutputOffsets); } @Override @@ -200,6 +229,42 @@ public class RSRawDecoder extends AbstractRawErasureDecoder { adjustedDirectBufferOutputsParameter); } + /* + * Convert data units first order to parity units first order. + */ + private void adjustOrder(T[] inputs, T[] inputs2, + int[] erasedIndexes, int[] erasedIndexes2, + T[] outputs, T[] outputs2) { + // Example: + // d0 d1 d2 d3 d4 d5 : p0 p1 p2 => p0 p1 p2 : d0 d1 d2 d3 d4 d5 + System.arraycopy(inputs, getNumDataUnits(), inputs2, + 0, getNumParityUnits()); + System.arraycopy(inputs, 0, inputs2, + getNumParityUnits(), getNumDataUnits()); + + int numErasedDataUnits = 0, numErasedParityUnits = 0; + int idx = 0; + for (int i = 0; i < erasedIndexes.length; i++) { + if (erasedIndexes[i] >= getNumDataUnits()) { + erasedIndexes2[idx++] = erasedIndexes[i] - getNumDataUnits(); + numErasedParityUnits++; + } + } + for (int i = 0; i < erasedIndexes.length; i++) { + if (erasedIndexes[i] < getNumDataUnits()) { + erasedIndexes2[idx++] = erasedIndexes[i] + getNumParityUnits(); + numErasedDataUnits++; + } + } + + // Copy for data units + System.arraycopy(outputs, numErasedDataUnits, outputs2, + 0, numErasedParityUnits); + // Copy for parity units + System.arraycopy(outputs, 0, outputs2, + numErasedParityUnits, numErasedDataUnits); + } + private byte[] checkGetBytesArrayBuffer(int idx, int bufferLen) { if (bytesArrayBuffers[idx] == null || bytesArrayBuffers[idx].length < bufferLen) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java index fa747d8f10..7a07b711be 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java @@ -35,8 +35,8 @@ public interface RawErasureDecoder extends RawErasureCoder { /** * Decode with inputs and erasedIndexes, generates outputs. * How to prepare for inputs: - * 1. Create an array containing parity units + data units. Please note the - * parity units should be first or before the data units. + * 1. Create an array containing data units + parity units. Please note the + * data units should be first or before the parity units. * 2. Set null in the array locations specified via erasedIndexes to indicate * they're erased and no data are to read from; * 3. Set null in the array locations for extra redundant items, as they're @@ -47,8 +47,8 @@ public interface RawErasureDecoder extends RawErasureCoder { * For an example using RS (6, 3), assuming sources (d0, d1, d2, d3, d4, d5) * and parities (p0, p1, p2), d2 being erased. We can and may want to use only * 6 units like (d1, d3, d4, d5, p0, p2) to recover d2. We will have: - * inputs = [p0, null(p1), p2, null(d0), d1, null(d2), d3, d4, d5] - * erasedIndexes = [5] // index of d2 into inputs array + * inputs = [null(d0), d1, null(d2), d3, d4, d5, p0, null(p1), p2] + * erasedIndexes = [2] // index of d2 into inputs array * outputs = [a-writable-buffer] * * Note, for both inputs and outputs, no mixing of on-heap buffers and direct diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java index 8f277f49f8..0d2b5ad2cb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java @@ -40,7 +40,7 @@ public abstract class TestCoderBase { private Configuration conf; protected int numDataUnits; protected int numParityUnits; - protected int baseChunkSize = 513; + protected int baseChunkSize = 1024; private int chunkSize = baseChunkSize; private BufferAllocator allocator; @@ -165,7 +165,9 @@ public abstract class TestCoderBase { byte[][] erased = toArrays(erasedChunks); byte[][] recovered = toArrays(recoveredChunks); boolean result = Arrays.deepEquals(erased, recovered); - assertTrue("Decoding and comparing failed.", result); + if (!result) { + assertTrue("Decoding and comparing failed.", result); + } } /** @@ -175,39 +177,41 @@ public abstract class TestCoderBase { */ protected int[] getErasedIndexesForDecoding() { int[] erasedIndexesForDecoding = - new int[erasedParityIndexes.length + erasedDataIndexes.length]; + new int[erasedDataIndexes.length + erasedParityIndexes.length]; int idx = 0; - for (int i = 0; i < erasedParityIndexes.length; i++) { - erasedIndexesForDecoding[idx ++] = erasedParityIndexes[i]; + for (int i = 0; i < erasedDataIndexes.length; i++) { + erasedIndexesForDecoding[idx ++] = erasedDataIndexes[i]; } - for (int i = 0; i < erasedDataIndexes.length; i++) { - erasedIndexesForDecoding[idx ++] = erasedDataIndexes[i] + numParityUnits; + for (int i = 0; i < erasedParityIndexes.length; i++) { + erasedIndexesForDecoding[idx ++] = erasedParityIndexes[i] + numDataUnits; } return erasedIndexesForDecoding; } /** - * Return input chunks for decoding, which is parityChunks + dataChunks. + * Return input chunks for decoding, which is dataChunks + parityChunks. * @param dataChunks * @param parityChunks * @return */ protected ECChunk[] prepareInputChunksForDecoding(ECChunk[] dataChunks, ECChunk[] parityChunks) { - ECChunk[] inputChunks = new ECChunk[numParityUnits + numDataUnits]; + ECChunk[] inputChunks = new ECChunk[numDataUnits + numParityUnits]; int idx = 0; - for (int i = 0; i < numParityUnits; i++) { - inputChunks[idx ++] = parityChunks[i]; - } + for (int i = 0; i < numDataUnits; i++) { inputChunks[idx ++] = dataChunks[i]; } - + + for (int i = 0; i < numParityUnits; i++) { + inputChunks[idx ++] = parityChunks[i]; + } + return inputChunks; } @@ -221,21 +225,21 @@ public abstract class TestCoderBase { */ protected ECChunk[] backupAndEraseChunks(ECChunk[] dataChunks, ECChunk[] parityChunks) { - ECChunk[] toEraseChunks = new ECChunk[erasedParityIndexes.length + - erasedDataIndexes.length]; + ECChunk[] toEraseChunks = new ECChunk[erasedDataIndexes.length + + erasedParityIndexes.length]; int idx = 0; - for (int i = 0; i < erasedParityIndexes.length; i++) { - toEraseChunks[idx ++] = parityChunks[erasedParityIndexes[i]]; - parityChunks[erasedParityIndexes[i]] = null; - } - for (int i = 0; i < erasedDataIndexes.length; i++) { toEraseChunks[idx ++] = dataChunks[erasedDataIndexes[i]]; dataChunks[erasedDataIndexes[i]] = null; } + for (int i = 0; i < erasedParityIndexes.length; i++) { + toEraseChunks[idx ++] = parityChunks[erasedParityIndexes[i]]; + parityChunks[erasedParityIndexes[i]] = null; + } + return toEraseChunks; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java index 738d28e1c4..0584977ff3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java @@ -235,14 +235,14 @@ public abstract class TestErasureCoderBase extends TestCoderBase { int idx = 0; TestBlock block; - for (int i = 0; i < erasedParityIndexes.length; i++) { - block = parityBlocks[erasedParityIndexes[i]]; + for (int i = 0; i < erasedDataIndexes.length; i++) { + block = dataBlocks[erasedDataIndexes[i]]; toEraseBlocks[idx ++] = cloneBlockWithData(block); eraseDataFromBlock(block); } - for (int i = 0; i < erasedDataIndexes.length; i++) { - block = dataBlocks[erasedDataIndexes[i]]; + for (int i = 0; i < erasedParityIndexes.length; i++) { + block = parityBlocks[erasedParityIndexes[i]]; toEraseBlocks[idx ++] = cloneBlockWithData(block); eraseDataFromBlock(block); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestRSErasureCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestRSErasureCoder.java index 94f77db4fb..08906c483b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestRSErasureCoder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestRSErasureCoder.java @@ -33,9 +33,6 @@ public class TestRSErasureCoder extends TestErasureCoderBase { this.encoderClass = RSErasureEncoder.class; this.decoderClass = RSErasureDecoder.class; - this.numDataUnits = 10; - this.numParityUnits = 1; - this.numChunksInBlock = 10; } @@ -119,8 +116,8 @@ public class TestRSErasureCoder extends TestErasureCoderBase { } @Test - public void testCodingDirectBuffer_3x3_erasing_d0_p0() { - prepare(null, 3, 3, new int[] {0}, new int[] {0}); + public void testCodingDirectBuffer_6x3_erasing_d0_p0() { + prepare(null, 6, 3, new int[] {0}, new int[] {0}); testCoding(true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 5dadd82b6c..7856133b61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -825,9 +825,7 @@ public class DFSStripedInputStream extends DFSInputStream { boolean prepareParityChunk(int index) { Preconditions.checkState(index >= dataBlkNum && alignedStripe.chunks[index] == null); - final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index, - dataBlkNum, parityBlkNum); - alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); + alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]); alignedStripe.chunks[index].addByteArraySlice(0, (int) alignedStripe.getSpanInBlock()); return true; @@ -835,8 +833,7 @@ public class DFSStripedInputStream extends DFSInputStream { @Override void decode() { - StripedBlockUtil.finalizeDecodeInputs(decodeInputs, dataBlkNum, - parityBlkNum, alignedStripe); + StripedBlockUtil.finalizeDecodeInputs(decodeInputs, alignedStripe); StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe, dataBlkNum, parityBlkNum, decoder); } @@ -867,12 +864,9 @@ public class DFSStripedInputStream extends DFSInputStream { int pos = (int) (range.offsetInBlock % cellSize + cellSize * i); cur.position(pos); cur.limit((int) (pos + range.spanInBlock)); - final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, - dataBlkNum, parityBlkNum); - decodeInputs[decodeIndex] = cur.slice(); + decodeInputs[i] = cur.slice(); if (alignedStripe.chunks[i] == null) { - alignedStripe.chunks[i] = new StripingChunk( - decodeInputs[decodeIndex]); + alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]); } } } @@ -887,13 +881,12 @@ public class DFSStripedInputStream extends DFSInputStream { // we have failed the block reader before return false; } - final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index, - dataBlkNum, parityBlkNum); + final int parityIndex = index - dataBlkNum; ByteBuffer buf = getParityBuffer().duplicate(); - buf.position(cellSize * decodeIndex); - buf.limit(cellSize * decodeIndex + (int) alignedStripe.range.spanInBlock); - decodeInputs[decodeIndex] = buf.slice(); - alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); + buf.position(cellSize * parityIndex); + buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock); + decodeInputs[index] = buf.slice(); + alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]); return true; } @@ -902,18 +895,16 @@ public class DFSStripedInputStream extends DFSInputStream { // TODO no copy for data chunks. this depends on HADOOP-12047 final int span = (int) alignedStripe.getSpanInBlock(); for (int i = 0; i < alignedStripe.chunks.length; i++) { - final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, - dataBlkNum, parityBlkNum); if (alignedStripe.chunks[i] != null && alignedStripe.chunks[i].state == StripingChunk.ALLZERO) { for (int j = 0; j < span; j++) { - decodeInputs[decodeIndex].put((byte) 0); + decodeInputs[i].put((byte) 0); } - decodeInputs[decodeIndex].flip(); + decodeInputs[i].flip(); } else if (alignedStripe.chunks[i] != null && alignedStripe.chunks[i].state == StripingChunk.FETCHED) { - decodeInputs[decodeIndex].position(0); - decodeInputs[decodeIndex].limit(span); + decodeInputs[i].position(0); + decodeInputs[i].limit(span); } } int[] decodeIndices = new int[parityBlkNum]; @@ -921,12 +912,10 @@ public class DFSStripedInputStream extends DFSInputStream { for (int i = 0; i < alignedStripe.chunks.length; i++) { if (alignedStripe.chunks[i] != null && alignedStripe.chunks[i].state == StripingChunk.MISSING) { - int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, - dataBlkNum, parityBlkNum); if (i < dataBlkNum) { - decodeIndices[pos++] = decodeIndex; + decodeIndices[pos++] = i; } else { - decodeInputs[decodeIndex] = null; + decodeInputs[i] = null; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 4977cc0c37..e8653c8a08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -270,8 +270,7 @@ public class StripedBlockUtil { // read the full data aligned stripe for (int i = 0; i < dataBlkNum; i++) { if (alignedStripe.chunks[i] == null) { - final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); - alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); + alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]); alignedStripe.chunks[i].addByteArraySlice(0, (int) alignedStripe.getSpanInBlock()); } @@ -287,40 +286,19 @@ public class StripedBlockUtil { * finalize decode input buffers. */ public static void finalizeDecodeInputs(final byte[][] decodeInputs, - int dataBlkNum, int parityBlkNum, AlignedStripe alignedStripe) { + AlignedStripe alignedStripe) { for (int i = 0; i < alignedStripe.chunks.length; i++) { final StripingChunk chunk = alignedStripe.chunks[i]; - final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); if (chunk != null && chunk.state == StripingChunk.FETCHED) { - chunk.copyTo(decodeInputs[decodeIndex]); + chunk.copyTo(decodeInputs[i]); } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) { - Arrays.fill(decodeInputs[decodeIndex], (byte) 0); + Arrays.fill(decodeInputs[i], (byte) 0); } else { - decodeInputs[decodeIndex] = null; + decodeInputs[i] = null; } } } - /** - * Currently decoding requires parity chunks are before data chunks. - * The indices are opposite to what we store in NN. In future we may - * improve the decoding to make the indices order the same as in NN. - * - * @param index The index to convert - * @param dataBlkNum The number of data blocks - * @param parityBlkNum The number of parity blocks - * @return converted index - */ - public static int convertIndex4Decode(int index, int dataBlkNum, - int parityBlkNum) { - return index < dataBlkNum ? index + parityBlkNum : index - dataBlkNum; - } - - public static int convertDecodeIndexBack(int index, int dataBlkNum, - int parityBlkNum) { - return index < parityBlkNum ? index + dataBlkNum : index - parityBlkNum; - } - /** * Decode based on the given input buffers and erasure coding policy. */ @@ -333,7 +311,7 @@ public class StripedBlockUtil { for (int i = 0; i < dataBlkNum; i++) { if (alignedStripe.chunks[i] != null && alignedStripe.chunks[i].state == StripingChunk.MISSING){ - decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + decodeIndices[pos++] = i; } } decodeIndices = Arrays.copyOf(decodeIndices, pos); @@ -345,8 +323,7 @@ public class StripedBlockUtil { // Step 3: fill original application buffer with decoded data for (int i = 0; i < decodeIndices.length; i++) { - int missingBlkIdx = convertDecodeIndexBack(decodeIndices[i], - dataBlkNum, parityBlkNum); + int missingBlkIdx = decodeIndices[i]; StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; if (chunk.state == StripingChunk.MISSING) { chunk.copyFrom(decodeOutputs[i]); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index d7140d8f78..64afcd0010 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -79,8 +79,6 @@ import org.apache.hadoop.util.DataChecksum; import com.google.common.base.Preconditions; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode; - /** * ErasureCodingWorker handles the erasure coding recovery work commands. These * commands would be issued from Namenode as part of Datanode's heart beat @@ -621,8 +619,7 @@ public final class ErasureCodingWorker { int m = 0; for (int i = 0; i < targets.length; i++) { if (targetsStatus[i]) { - result[m++] = convertIndex4Decode(targetIndices[i], - dataBlkNum, parityBlkNum); + result[m++] = targetIndices[i]; } } return Arrays.copyOf(result, m); @@ -636,15 +633,13 @@ public final class ErasureCodingWorker { StripedReader reader = stripedReaders.get(success[i]); ByteBuffer buffer = reader.buffer; paddingBufferToLen(buffer, toRecoverLen); - inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] = - (ByteBuffer)buffer.flip(); + inputs[reader.index] = (ByteBuffer)buffer.flip(); } if (success.length < dataBlkNum) { for (int i = 0; i < zeroStripeBuffers.length; i++) { ByteBuffer buffer = zeroStripeBuffers[i]; paddingBufferToLen(buffer, toRecoverLen); - int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum, - parityBlkNum); + int index = zeroStripeIndices[i]; inputs[index] = (ByteBuffer)buffer.flip(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 549a744bc7..495f1b5eeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -211,32 +211,33 @@ public class TestDFSStripedInputStream { } } + RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf, + DATA_BLK_NUM, PARITY_BLK_NUM); + // Update the expected content for decoded data for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE]; - int[] missingBlkIdx = new int[]{failedDNIdx + PARITY_BLK_NUM, 1, 2}; + int[] missingBlkIdx = new int[]{failedDNIdx, 7, 8}; byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE]; for (int j = 0; j < DATA_BLK_NUM; j++) { int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE; if (j != failedDNIdx) { - System.arraycopy(expected, posInBuf, decodeInputs[j + PARITY_BLK_NUM], - 0, CELLSIZE); + System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE); } } for (int k = 0; k < CELLSIZE; k++) { int posInBlk = i * CELLSIZE + k; - decodeInputs[0][k] = SimulatedFSDataset.simulatedByte( + decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte( new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk); } for (int m : missingBlkIdx) { decodeInputs[m] = null; } - RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf, - DATA_BLK_NUM, PARITY_BLK_NUM); rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs); int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE; System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE); } + int delta = 10; int done = 0; // read a small delta, shouldn't trigger decode