From 343c0e76fcd95ac739ca7cd6742c9d617e19fc37 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Mon, 18 May 2015 10:14:54 -0700 Subject: [PATCH] HADOOP-11938. Enhance ByteBuffer version encode/decode API of raw erasure coder. Contributed by Kai Zheng. --- .../hadoop-common/CHANGES-HDFS-EC-7285.txt | 3 + .../apache/hadoop/io/erasurecode/ECChunk.java | 35 ++--- .../rawcoder/AbstractRawErasureCoder.java | 83 +++++------ .../rawcoder/AbstractRawErasureDecoder.java | 69 +++++++-- .../rawcoder/AbstractRawErasureEncoder.java | 66 +++++++-- .../io/erasurecode/rawcoder/RSRawDecoder.java | 22 +-- .../io/erasurecode/rawcoder/RSRawEncoder.java | 41 +++--- .../erasurecode/rawcoder/XORRawDecoder.java | 30 ++-- .../erasurecode/rawcoder/XORRawEncoder.java | 40 ++++-- .../rawcoder/util/GaloisField.java | 112 +++++++++++---- .../hadoop/io/erasurecode/TestCoderBase.java | 131 ++++++++++++++---- .../coder/TestErasureCoderBase.java | 21 ++- .../erasurecode/rawcoder/TestRSRawCoder.java | 12 +- .../rawcoder/TestRSRawCoderBase.java | 12 +- .../rawcoder/TestRawCoderBase.java | 57 +++++++- .../erasurecode/rawcoder/TestXORRawCoder.java | 19 +++ 16 files changed, 538 insertions(+), 215 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt index 34dfc9e5ca..c799b4fed3 100644 --- a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt @@ -51,3 +51,6 @@ HADOOP-11566. Add tests and fix for erasure coders to recover erased parity units. (Kai Zheng via Zhe Zhang) + + HADOOP-11938. Enhance ByteBuffer version encode/decode API of raw erasure + coder. (Kai Zheng via Zhe Zhang) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java index 69a8343ebe..310c738265 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java @@ -72,34 +72,15 @@ public static ByteBuffer[] toBuffers(ECChunk[] chunks) { } /** - * Convert an array of this chunks to an array of byte array. - * Note the chunk buffers are not affected. - * @param chunks - * @return an array of byte array + * Convert to a bytes array, just for test usage. + * @return bytes array */ - public static byte[][] toArrays(ECChunk[] chunks) { - byte[][] bytesArr = new byte[chunks.length][]; - - ByteBuffer buffer; - ECChunk chunk; - for (int i = 0; i < chunks.length; i++) { - chunk = chunks[i]; - if (chunk == null) { - bytesArr[i] = null; - continue; - } - - buffer = chunk.getBuffer(); - if (buffer.hasArray()) { - bytesArr[i] = buffer.array(); - } else { - bytesArr[i] = new byte[buffer.remaining()]; - // Avoid affecting the original one - buffer.mark(); - buffer.get(bytesArr[i]); - buffer.reset(); - } - } + public byte[] toBytesArray() { + byte[] bytesArr = new byte[chunkBuffer.remaining()]; + // Avoid affecting the original one + chunkBuffer.mark(); + chunkBuffer.get(bytesArr); + chunkBuffer.reset(); return bytesArr; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java index 2400313b42..52689620a1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.io.erasurecode.rawcoder; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configured; import java.nio.ByteBuffer; @@ -30,9 +31,6 @@ public abstract class AbstractRawErasureCoder extends Configured implements RawErasureCoder { - // Hope to reset coding buffers a little faster using it - private byte[] zeroChunkBytes; - private int numDataUnits; private int numParityUnits; private int chunkSize; @@ -43,8 +41,6 @@ public void initialize(int numDataUnits, int numParityUnits, this.numDataUnits = numDataUnits; this.numParityUnits = numParityUnits; this.chunkSize = chunkSize; - - zeroChunkBytes = new byte[chunkSize]; // With ZERO by default } @Override @@ -73,42 +69,17 @@ public void release() { } /** - * Convert an array of heap ByteBuffers to an array of byte array. - * @param buffers - * @return an array of byte array + * Ensure output buffer filled with ZERO bytes fully in chunkSize. + * @param buffer a buffer ready to write chunk size bytes + * @return the buffer itself, with ZERO bytes written, the position and limit + * are not changed after the call */ - protected static byte[][] toArrays(ByteBuffer[] buffers) { - byte[][] bytesArr = new byte[buffers.length][]; - - ByteBuffer buffer; - for (int i = 0; i < buffers.length; i++) { - buffer = buffers[i]; - if (buffer == null) { - bytesArr[i] = null; - continue; - } - - if (buffer.hasArray()) { - bytesArr[i] = buffer.array(); - } else { - throw new IllegalArgumentException("Invalid ByteBuffer passed, " + - "expecting heap buffer"); - } + protected ByteBuffer resetOutputBuffer(ByteBuffer buffer) { + int pos = buffer.position(); + for (int i = pos; i < buffer.limit(); ++i) { + buffer.put((byte) 0); } - - return bytesArr; - } - - /** - * Ensure the buffer (either input or output) ready to read or write with ZERO - * bytes fully in chunkSize. - * @param buffer - * @return the buffer itself - */ - protected ByteBuffer resetBuffer(ByteBuffer buffer) { - buffer.clear(); - buffer.put(zeroChunkBytes); - buffer.position(0); + buffer.position(pos); return buffer; } @@ -119,9 +90,39 @@ protected ByteBuffer resetBuffer(ByteBuffer buffer) { * @param buffer bytes array buffer * @return the buffer itself */ - protected byte[] resetBuffer(byte[] buffer) { - System.arraycopy(zeroChunkBytes, 0, buffer, 0, buffer.length); + protected byte[] resetBuffer(byte[] buffer, int offset, int len) { + for (int i = offset; i < len; ++i) { + buffer[i] = (byte) 0; + } return buffer; } + + /** + * Check and ensure the buffers are of the length specified by dataLen. + * @param buffers + * @param dataLen + */ + protected void ensureLength(ByteBuffer[] buffers, int dataLen) { + for (int i = 0; i < buffers.length; ++i) { + if (buffers[i].remaining() != dataLen) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + dataLen); + } + } + } + + /** + * Check and ensure the buffers are of the length specified by dataLen. + * @param buffers + * @param dataLen + */ + protected void ensureLength(byte[][] buffers, int dataLen) { + for (int i = 0; i < buffers.length; ++i) { + if (buffers[i].length != dataLen) { + throw new HadoopIllegalArgumentException( + "Invalid buffer not of length " + dataLen); + } + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java index b247543fd3..31f4fb858c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.io.erasurecode.rawcoder; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.io.erasurecode.ECChunk; import java.nio.ByteBuffer; @@ -33,14 +34,43 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder public void decode(ByteBuffer[] inputs, int[] erasedIndexes, ByteBuffer[] outputs) { checkParameters(inputs, erasedIndexes, outputs); + int dataLen = inputs[0].remaining(); + if (dataLen == 0) { + return; + } + ensureLength(inputs, dataLen); + ensureLength(outputs, dataLen); - boolean hasArray = inputs[0].hasArray(); - if (hasArray) { - byte[][] newInputs = toArrays(inputs); - byte[][] newOutputs = toArrays(outputs); - doDecode(newInputs, erasedIndexes, newOutputs); - } else { + boolean usingDirectBuffer = inputs[0].isDirect(); + if (usingDirectBuffer) { doDecode(inputs, erasedIndexes, outputs); + return; + } + + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + inputOffsets[i] = buffer.position(); + newInputs[i] = buffer.array(); + } + + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.position(); + newOutputs[i] = buffer.array(); + } + + doDecode(newInputs, inputOffsets, dataLen, + erasedIndexes, newOutputs, outputOffsets); + + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + buffer.position(inputOffsets[i] + dataLen); // dataLen bytes consumed } } @@ -56,18 +86,33 @@ protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, @Override public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) { checkParameters(inputs, erasedIndexes, outputs); + int dataLen = inputs[0].length; + if (dataLen == 0) { + return; + } + ensureLength(inputs, dataLen); + ensureLength(outputs, dataLen); - doDecode(inputs, erasedIndexes, outputs); + int[] inputOffsets = new int[inputs.length]; // ALL ZERO + int[] outputOffsets = new int[outputs.length]; // ALL ZERO + + doDecode(inputs, inputOffsets, dataLen, erasedIndexes, outputs, + outputOffsets); } /** - * Perform the real decoding using bytes array + * Perform the real decoding using bytes array, supporting offsets and + * lengths. * @param inputs + * @param inputOffsets + * @param dataLen * @param erasedIndexes * @param outputs + * @param outputOffsets */ - protected abstract void doDecode(byte[][] inputs, int[] erasedIndexes, - byte[][] outputs); + protected abstract void doDecode(byte[][] inputs, int[] inputOffsets, + int dataLen, int[] erasedIndexes, + byte[][] outputs, int[] outputOffsets); @Override public void decode(ECChunk[] inputs, int[] erasedIndexes, @@ -91,12 +136,12 @@ protected void checkParameters(Object[] inputs, int[] erasedIndexes, } if (erasedIndexes.length != outputs.length) { - throw new IllegalArgumentException( + throw new HadoopIllegalArgumentException( "erasedIndexes and outputs mismatch in length"); } if (erasedIndexes.length > getNumParityUnits()) { - throw new IllegalArgumentException( + throw new HadoopIllegalArgumentException( "Too many erased, not recoverable"); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java index 06e88bf287..0ae54c5531 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.io.erasurecode.rawcoder; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.io.erasurecode.ECChunk; import java.nio.ByteBuffer; @@ -32,14 +33,42 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder @Override public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) { checkParameters(inputs, outputs); + int dataLen = inputs[0].remaining(); + if (dataLen == 0) { + return; + } + ensureLength(inputs, dataLen); + ensureLength(outputs, dataLen); - boolean hasArray = inputs[0].hasArray(); - if (hasArray) { - byte[][] newInputs = toArrays(inputs); - byte[][] newOutputs = toArrays(outputs); - doEncode(newInputs, newOutputs); - } else { + boolean usingDirectBuffer = inputs[0].isDirect(); + if (usingDirectBuffer) { doEncode(inputs, outputs); + return; + } + + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + inputOffsets[i] = buffer.position(); + newInputs[i] = buffer.array(); + } + + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.position(); + newOutputs[i] = buffer.array(); + } + + doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets); + + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + buffer.position(buffer.position() + dataLen); // dataLen bytes consumed } } @@ -53,16 +82,31 @@ public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) { @Override public void encode(byte[][] inputs, byte[][] outputs) { checkParameters(inputs, outputs); + int dataLen = inputs[0].length; + if (dataLen == 0) { + return; + } + ensureLength(inputs, dataLen); + ensureLength(outputs, dataLen); - doEncode(inputs, outputs); + int[] inputOffsets = new int[inputs.length]; // ALL ZERO + int[] outputOffsets = new int[outputs.length]; // ALL ZERO + + doEncode(inputs, inputOffsets, dataLen, outputs, outputOffsets); } /** - * Perform the real encoding work using bytes array + * Perform the real encoding work using bytes array, supporting offsets + * and lengths. * @param inputs + * @param inputOffsets + * @param dataLen * @param outputs + * @param outputOffsets */ - protected abstract void doEncode(byte[][] inputs, byte[][] outputs); + protected abstract void doEncode(byte[][] inputs, int[] inputOffsets, + int dataLen, byte[][] outputs, + int[] outputOffsets); @Override public void encode(ECChunk[] inputs, ECChunk[] outputs) { @@ -78,10 +122,10 @@ public void encode(ECChunk[] inputs, ECChunk[] outputs) { */ protected void checkParameters(Object[] inputs, Object[] outputs) { if (inputs.length != getNumDataUnits()) { - throw new IllegalArgumentException("Invalid inputs length"); + throw new HadoopIllegalArgumentException("Invalid inputs length"); } if (outputs.length != getNumParityUnits()) { - throw new IllegalArgumentException("Invalid outputs length"); + throw new HadoopIllegalArgumentException("Invalid outputs length"); } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java index 24fa637426..ff1162f494 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java @@ -36,9 +36,9 @@ public void initialize(int numDataUnits, int numParityUnits, int chunkSize) { super.initialize(numDataUnits, numParityUnits, chunkSize); assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize()); - this.errSignature = new int[getNumParityUnits()]; - this.primitivePower = RSUtil.getPrimitivePower(getNumDataUnits(), - getNumParityUnits()); + this.errSignature = new int[numParityUnits]; + this.primitivePower = RSUtil.getPrimitivePower(numDataUnits, + numParityUnits); } @Override @@ -49,21 +49,21 @@ protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, RSUtil.GF.substitute(inputs, outputs[i], primitivePower[i]); } - int dataLen = inputs[0].remaining(); - RSUtil.GF.solveVandermondeSystem(errSignature, outputs, - erasedIndexes.length, dataLen); + RSUtil.GF.solveVandermondeSystem(errSignature, + outputs, erasedIndexes.length); } @Override - protected void doDecode(byte[][] inputs, int[] erasedIndexes, - byte[][] outputs) { + protected void doDecode(byte[][] inputs, int[] inputOffsets, + int dataLen, int[] erasedIndexes, + byte[][] outputs, int[] outputOffsets) { for (int i = 0; i < erasedIndexes.length; i++) { errSignature[i] = primitivePower[erasedIndexes[i]]; - RSUtil.GF.substitute(inputs, outputs[i], primitivePower[i]); + RSUtil.GF.substitute(inputs, inputOffsets, dataLen, outputs[i], + outputOffsets[i], primitivePower[i]); } - int dataLen = inputs[0].length; - RSUtil.GF.solveVandermondeSystem(errSignature, outputs, + RSUtil.GF.solveVandermondeSystem(errSignature, outputs, outputOffsets, erasedIndexes.length, dataLen); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java index 7b501ceb45..9136331fbd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java @@ -34,12 +34,12 @@ public void initialize(int numDataUnits, int numParityUnits, int chunkSize) { super.initialize(numDataUnits, numParityUnits, chunkSize); assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize()); - int[] primitivePower = RSUtil.getPrimitivePower(getNumDataUnits(), - getNumParityUnits()); + int[] primitivePower = RSUtil.getPrimitivePower(numDataUnits, + numParityUnits); // compute generating polynomial int[] gen = {1}; int[] poly = new int[2]; - for (int i = 0; i < getNumParityUnits(); i++) { + for (int i = 0; i < numParityUnits; i++) { poly[0] = primitivePower[i]; poly[1] = 1; gen = RSUtil.GF.multiply(gen, poly); @@ -50,29 +50,30 @@ public void initialize(int numDataUnits, int numParityUnits, int chunkSize) { @Override protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { - ByteBuffer[] data = new ByteBuffer[getNumDataUnits() + getNumParityUnits()]; - for (int i = 0; i < getNumParityUnits(); i++) { - data[i] = outputs[i]; - } - for (int i = 0; i < getNumDataUnits(); i++) { - data[i + getNumParityUnits()] = inputs[i]; - } + // parity units + data units + ByteBuffer[] all = new ByteBuffer[outputs.length + inputs.length]; + System.arraycopy(outputs, 0, all, 0, outputs.length); + System.arraycopy(inputs, 0, all, outputs.length, inputs.length); // Compute the remainder - RSUtil.GF.remainder(data, generatingPolynomial); + RSUtil.GF.remainder(all, generatingPolynomial); } @Override - protected void doEncode(byte[][] inputs, byte[][] outputs) { - byte[][] data = new byte[getNumDataUnits() + getNumParityUnits()][]; - for (int i = 0; i < getNumParityUnits(); i++) { - data[i] = outputs[i]; - } - for (int i = 0; i < getNumDataUnits(); i++) { - data[i + getNumParityUnits()] = inputs[i]; - } + protected void doEncode(byte[][] inputs, int[] inputOffsets, + int dataLen, byte[][] outputs, + int[] outputOffsets) { + // parity units + data units + byte[][] all = new byte[outputs.length + inputs.length][]; + System.arraycopy(outputs, 0, all, 0, outputs.length); + System.arraycopy(inputs, 0, all, outputs.length, inputs.length); + + int[] offsets = new int[inputOffsets.length + outputOffsets.length]; + System.arraycopy(outputOffsets, 0, offsets, 0, outputOffsets.length); + System.arraycopy(inputOffsets, 0, offsets, + outputOffsets.length, inputOffsets.length); // Compute the remainder - RSUtil.GF.remainder(data, generatingPolynomial); + RSUtil.GF.remainder(all, offsets, dataLen, generatingPolynomial); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java index 2ea1b3d708..bf6e894270 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java @@ -21,47 +21,57 @@ /** * A raw decoder in XOR code scheme in pure Java, adapted from HDFS-RAID. + * + * XOR code is an important primitive code scheme in erasure coding and often + * used in advanced codes, like HitchHiker and LRC, though itself is rarely + * deployed independently. */ public class XORRawDecoder extends AbstractRawErasureDecoder { @Override protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, ByteBuffer[] outputs) { - resetBuffer(outputs[0]); + ByteBuffer output = outputs[0]; + resetOutputBuffer(output); - int bufSize = getChunkSize(); int erasedIdx = erasedIndexes[0]; // Process the inputs. + int iIdx, oIdx; for (int i = 0; i < inputs.length; i++) { // Skip the erased location. if (i == erasedIdx) { continue; } - for (int j = 0; j < bufSize; j++) { - outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j))); + for (iIdx = inputs[i].position(), oIdx = output.position(); + iIdx < inputs[i].limit(); + iIdx++, oIdx++) { + output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx))); } } } @Override - protected void doDecode(byte[][] inputs, - int[] erasedIndexes, byte[][] outputs) { - resetBuffer(outputs[0]); + protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen, + int[] erasedIndexes, byte[][] outputs, + int[] outputOffsets) { + byte[] output = outputs[0]; + resetBuffer(output, outputOffsets[0], dataLen); - int bufSize = getChunkSize(); int erasedIdx = erasedIndexes[0]; // Process the inputs. + int iIdx, oIdx; for (int i = 0; i < inputs.length; i++) { // Skip the erased location. if (i == erasedIdx) { continue; } - for (int j = 0; j < bufSize; j++) { - outputs[0][j] ^= inputs[i][j]; + for (iIdx = inputOffsets[i], oIdx = outputOffsets[0]; + iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) { + output[oIdx] ^= inputs[i][iIdx]; } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java index 116cb91ea3..feffbbf015 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java @@ -21,43 +21,53 @@ /** * A raw encoder in XOR code scheme in pure Java, adapted from HDFS-RAID. + * + * XOR code is an important primitive code scheme in erasure coding and often + * used in advanced codes, like HitchHiker and LRC, though itself is rarely + * deployed independently. */ public class XORRawEncoder extends AbstractRawErasureEncoder { - @Override protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { - resetBuffer(outputs[0]); + ByteBuffer output = outputs[0]; + resetOutputBuffer(output); - int bufSize = getChunkSize(); // Get the first buffer's data. - for (int j = 0; j < bufSize; j++) { - outputs[0].put(j, inputs[0].get(j)); + int iIdx, oIdx; + for (iIdx = inputs[0].position(), oIdx = output.position(); + iIdx < inputs[0].limit(); iIdx++, oIdx++) { + output.put(oIdx, inputs[0].get(iIdx)); } // XOR with everything else. for (int i = 1; i < inputs.length; i++) { - for (int j = 0; j < bufSize; j++) { - outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j))); + for (iIdx = inputs[i].position(), oIdx = output.position(); + iIdx < inputs[i].limit(); + iIdx++, oIdx++) { + output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx))); } } } @Override - protected void doEncode(byte[][] inputs, byte[][] outputs) { - resetBuffer(outputs[0]); + protected void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen, + byte[][] outputs, int[] outputOffsets) { + byte[] output = outputs[0]; + resetBuffer(output, outputOffsets[0], dataLen); - int bufSize = getChunkSize(); // Get the first buffer's data. - for (int j = 0; j < bufSize; j++) { - outputs[0][j] = inputs[0][j]; + int iIdx, oIdx; + for (iIdx = inputOffsets[0], oIdx = outputOffsets[0]; + iIdx < inputOffsets[0] + dataLen; iIdx++, oIdx++) { + output[oIdx] = inputs[0][iIdx]; } // XOR with everything else. for (int i = 1; i < inputs.length; i++) { - for (int j = 0; j < bufSize; j++) { - outputs[0][j] ^= inputs[i][j]; + for (iIdx = inputOffsets[i], oIdx = outputOffsets[0]; + iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) { + output[oIdx] ^= inputs[i][iIdx]; } } } - } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java index 77544c6223..62b22c9fce 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java @@ -235,26 +235,30 @@ public void solveVandermondeSystem(int[] x, int[] y, int len) { /** * A "bulk" version to the solving of Vandermonde System */ - public void solveVandermondeSystem(int[] x, byte[][] y, + public void solveVandermondeSystem(int[] x, byte[][] y, int[] outputOffsets, int len, int dataLen) { + int idx1, idx2; for (int i = 0; i < len - 1; i++) { for (int j = len - 1; j > i; j--) { - for (int k = 0; k < dataLen; k++) { - y[j][k] = (byte) (y[j][k] ^ mulTable[x[i]][y[j - 1][k] & + for (idx2 = outputOffsets[j-1], idx1 = outputOffsets[j]; + idx1 < outputOffsets[j] + dataLen; idx1++, idx2++) { + y[j][idx1] = (byte) (y[j][idx1] ^ mulTable[x[i]][y[j - 1][idx2] & 0x000000FF]); } } } for (int i = len - 1; i >= 0; i--) { for (int j = i + 1; j < len; j++) { - for (int k = 0; k < dataLen; k++) { - y[j][k] = (byte) (divTable[y[j][k] & 0x000000FF][x[j] ^ + for (idx1 = outputOffsets[j]; + idx1 < outputOffsets[j] + dataLen; idx1++) { + y[j][idx1] = (byte) (divTable[y[j][idx1] & 0x000000FF][x[j] ^ x[j - i - 1]]); } } for (int j = i; j < len - 1; j++) { - for (int k = 0; k < dataLen; k++) { - y[j][k] = (byte) (y[j][k] ^ y[j + 1][k]); + for (idx2 = outputOffsets[j+1], idx1 = outputOffsets[j]; + idx1 < outputOffsets[j] + dataLen; idx1++, idx2++) { + y[j][idx1] = (byte) (y[j][idx1] ^ y[j + 1][idx2]); } } } @@ -263,26 +267,34 @@ public void solveVandermondeSystem(int[] x, byte[][] y, /** * A "bulk" version of the solveVandermondeSystem, using ByteBuffer. */ - public void solveVandermondeSystem(int[] x, ByteBuffer[] y, - int len, int dataLen) { + public void solveVandermondeSystem(int[] x, ByteBuffer[] y, int len) { + ByteBuffer p; + int idx1, idx2; for (int i = 0; i < len - 1; i++) { for (int j = len - 1; j > i; j--) { - for (int k = 0; k < dataLen; k++) { - y[j].put(k, (byte) (y[j].get(k) ^ mulTable[x[i]][y[j - 1].get(k) & + p = y[j]; + for (idx1 = p.position(), idx2 = y[j-1].position(); + idx1 < p.limit(); idx1++, idx2++) { + p.put(idx1, (byte) (p.get(idx1) ^ mulTable[x[i]][y[j-1].get(idx2) & 0x000000FF])); } } } + for (int i = len - 1; i >= 0; i--) { for (int j = i + 1; j < len; j++) { - for (int k = 0; k < dataLen; k++) { - y[j].put(k, (byte) (divTable[y[j].get(k) & 0x000000FF][x[j] ^ - x[j - i - 1]])); + p = y[j]; + for (idx1 = p.position(); idx1 < p.limit(); idx1++) { + p.put(idx1, (byte) (divTable[p.get(idx1) & + 0x000000FF][x[j] ^ x[j - i - 1]])); } } + for (int j = i; j < len - 1; j++) { - for (int k = 0; k < dataLen; k++) { - y[j].put(k, (byte) (y[j].get(k) ^ y[j + 1].get(k))); + p = y[j]; + for (idx1 = p.position(), idx2 = y[j+1].position(); + idx1 < p.limit(); idx1++, idx2++) { + p.put(idx1, (byte) (p.get(idx1) ^ y[j+1].get(idx2))); } } } @@ -393,6 +405,31 @@ public void substitute(byte[][] p, byte[] q, int x) { } } + /** + * A "bulk" version of the substitute. + * Tends to be 2X faster than the "int" substitute in a loop. + * + * @param p input polynomial + * @param offsets + * @param len + * @param q store the return result + * @param offset + * @param x input field + */ + public void substitute(byte[][] p, int[] offsets, + int len, byte[] q, int offset, int x) { + int y = 1, iIdx, oIdx; + for (int i = 0; i < p.length; i++) { + byte[] pi = p[i]; + for (iIdx = offsets[i], oIdx = offset; + iIdx < offsets[i] + len; iIdx++, oIdx++) { + int pij = pi[iIdx] & 0x000000FF; + q[oIdx] = (byte) (q[oIdx] ^ mulTable[pij][y]); + } + y = mulTable[x][y]; + } + } + /** * A "bulk" version of the substitute, using ByteBuffer. * Tends to be 2X faster than the "int" substitute in a loop. @@ -402,13 +439,13 @@ public void substitute(byte[][] p, byte[] q, int x) { * @param x input field */ public void substitute(ByteBuffer[] p, ByteBuffer q, int x) { - int y = 1; + int y = 1, iIdx, oIdx; for (int i = 0; i < p.length; i++) { ByteBuffer pi = p[i]; - int len = pi.remaining(); - for (int j = 0; j < len; j++) { - int pij = pi.get(j) & 0x000000FF; - q.put(j, (byte) (q.get(j) ^ mulTable[pij][y])); + for (iIdx = pi.position(), oIdx = q.position(); + iIdx < pi.limit(); iIdx++, oIdx++) { + int pij = pi.get(iIdx) & 0x000000FF; + q.put(oIdx, (byte) (q.get(oIdx) ^ mulTable[pij][y])); } y = mulTable[x][y]; } @@ -431,18 +468,43 @@ public void remainder(byte[][] dividend, int[] divisor) { } } + /** + * The "bulk" version of the remainder. + * Warning: This function will modify the "dividend" inputs. + */ + public void remainder(byte[][] dividend, int[] offsets, + int len, int[] divisor) { + int idx1, idx2; + for (int i = dividend.length - divisor.length; i >= 0; i--) { + for (int j = 0; j < divisor.length; j++) { + for (idx2 = offsets[j + i], idx1 = offsets[i + divisor.length - 1]; + idx1 < offsets[i + divisor.length - 1] + len; + idx1++, idx2++) { + int ratio = divTable[dividend[i + divisor.length - 1][idx1] & + 0x00FF][divisor[divisor.length - 1]]; + dividend[j + i][idx2] = (byte) ((dividend[j + i][idx2] & 0x00FF) ^ + mulTable[ratio][divisor[j]]); + } + } + } + } + /** * The "bulk" version of the remainder, using ByteBuffer. * Warning: This function will modify the "dividend" inputs. */ public void remainder(ByteBuffer[] dividend, int[] divisor) { + int idx1, idx2; + ByteBuffer b1, b2; for (int i = dividend.length - divisor.length; i >= 0; i--) { - int width = dividend[i].remaining(); for (int j = 0; j < divisor.length; j++) { - for (int k = 0; k < width; k++) { - int ratio = divTable[dividend[i + divisor.length - 1].get(k) & + b1 = dividend[i + divisor.length - 1]; + b2 = dividend[j + i]; + for (idx1 = b1.position(), idx2 = b2.position(); + idx1 < b1.limit(); idx1++, idx2++) { + int ratio = divTable[b1.get(idx1) & 0x00FF][divisor[divisor.length - 1]]; - dividend[j + i].put(k, (byte) ((dividend[j + i].get(k) & 0x00FF) ^ + b2.put(idx2, (byte) ((b2.get(idx2) & 0x00FF) ^ mulTable[ratio][divisor[j]])); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java index 769427d3a0..cc3617cc43 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java @@ -35,7 +35,12 @@ public abstract class TestCoderBase { private Configuration conf; protected int numDataUnits; protected int numParityUnits; - protected int chunkSize = 16 * 1024; + protected int baseChunkSize = 16 * 1024; + private int chunkSize = baseChunkSize; + + private byte[] zeroChunkBytes; + + private boolean startBufferWithZero = true; // Indexes of erased data units. protected int[] erasedDataIndexes = new int[] {0}; @@ -47,6 +52,15 @@ public abstract class TestCoderBase { // may go to different coding implementations. protected boolean usingDirectBuffer = true; + protected int getChunkSize() { + return chunkSize; + } + + protected void setChunkSize(int chunkSize) { + this.chunkSize = chunkSize; + this.zeroChunkBytes = new byte[chunkSize]; // With ZERO by default + } + /** * Prepare before running the case. * @param numDataUnits @@ -80,8 +94,8 @@ protected Configuration getConf() { */ protected void compareAndVerify(ECChunk[] erasedChunks, ECChunk[] recoveredChunks) { - byte[][] erased = ECChunk.toArrays(erasedChunks); - byte[][] recovered = ECChunk.toArrays(recoveredChunks); + byte[][] erased = toArrays(erasedChunks); + byte[][] recovered = toArrays(recoveredChunks); boolean result = Arrays.deepEquals(erased, recovered); assertTrue("Decoding and comparing failed.", result); } @@ -171,16 +185,19 @@ protected void eraseDataFromChunks(ECChunk[] chunks) { /** * Erase data from the specified chunk, putting ZERO bytes to the buffer. - * @param chunk + * @param chunk with a buffer ready to read at the current position */ protected void eraseDataFromChunk(ECChunk chunk) { ByteBuffer chunkBuffer = chunk.getBuffer(); - // erase the data - chunkBuffer.position(0); - for (int i = 0; i < chunkSize; i++) { - chunkBuffer.put((byte) 0); - } + // Erase the data at the position, and restore the buffer ready for reading + // same many bytes but all ZERO. + int pos = chunkBuffer.position(); + int len = chunkBuffer.remaining(); + chunkBuffer.put(zeroChunkBytes, 0, len); + // Back to readable again after data erased chunkBuffer.flip(); + chunkBuffer.position(pos); + chunkBuffer.limit(pos + len); } /** @@ -190,7 +207,7 @@ protected void eraseDataFromChunk(ECChunk chunk) { * @param chunks * @return */ - protected static ECChunk[] cloneChunksWithData(ECChunk[] chunks) { + protected ECChunk[] cloneChunksWithData(ECChunk[] chunks) { ECChunk[] results = new ECChunk[chunks.length]; for (int i = 0; i < chunks.length; i++) { results[i] = cloneChunkWithData(chunks[i]); @@ -206,22 +223,19 @@ protected static ECChunk[] cloneChunksWithData(ECChunk[] chunks) { * @param chunk * @return a new chunk */ - protected static ECChunk cloneChunkWithData(ECChunk chunk) { + protected ECChunk cloneChunkWithData(ECChunk chunk) { ByteBuffer srcBuffer = chunk.getBuffer(); - ByteBuffer destBuffer; byte[] bytesArr = new byte[srcBuffer.remaining()]; srcBuffer.mark(); - srcBuffer.get(bytesArr); + srcBuffer.get(bytesArr, 0, bytesArr.length); srcBuffer.reset(); - if (srcBuffer.hasArray()) { - destBuffer = ByteBuffer.wrap(bytesArr); - } else { - destBuffer = ByteBuffer.allocateDirect(srcBuffer.remaining()); - destBuffer.put(bytesArr); - destBuffer.flip(); - } + ByteBuffer destBuffer = allocateOutputBuffer(bytesArr.length); + int pos = destBuffer.position(); + destBuffer.put(bytesArr); + destBuffer.flip(); + destBuffer.position(pos); return new ECChunk(destBuffer); } @@ -231,18 +245,30 @@ protected static ECChunk cloneChunkWithData(ECChunk chunk) { * @return */ protected ECChunk allocateOutputChunk() { - ByteBuffer buffer = allocateOutputBuffer(); + ByteBuffer buffer = allocateOutputBuffer(chunkSize); return new ECChunk(buffer); } /** - * Allocate a buffer for output or writing. - * @return + * Allocate a buffer for output or writing. It can prepare for two kinds of + * data buffers: one with position as 0, the other with position > 0 + * @return a buffer ready to write chunkSize bytes from current position */ - protected ByteBuffer allocateOutputBuffer() { + protected ByteBuffer allocateOutputBuffer(int bufferLen) { + /** + * When startBufferWithZero, will prepare a buffer as:--------------- + * otherwise, the buffer will be like: ___TO--BE--WRITTEN___, + * and in the beginning, dummy data are prefixed, to simulate a buffer of + * position > 0. + */ + int startOffset = startBufferWithZero ? 0 : 11; // 11 is arbitrary + int allocLen = startOffset + bufferLen + startOffset; ByteBuffer buffer = usingDirectBuffer ? - ByteBuffer.allocateDirect(chunkSize) : ByteBuffer.allocate(chunkSize); + ByteBuffer.allocateDirect(allocLen) : ByteBuffer.allocate(allocLen); + buffer.limit(startOffset + bufferLen); + fillDummyData(buffer, startOffset); + startBufferWithZero = ! startBufferWithZero; return buffer; } @@ -265,15 +291,34 @@ protected ECChunk[] prepareDataChunksForEncoding() { * @return */ protected ECChunk generateDataChunk() { - ByteBuffer buffer = allocateOutputBuffer(); - for (int i = 0; i < chunkSize; i++) { - buffer.put((byte) RAND.nextInt(256)); - } + ByteBuffer buffer = allocateOutputBuffer(chunkSize); + int pos = buffer.position(); + buffer.put(generateData(chunkSize)); buffer.flip(); + buffer.position(pos); return new ECChunk(buffer); } + /** + * Fill len of dummy data in the buffer at the current position. + * @param buffer + * @param len + */ + protected void fillDummyData(ByteBuffer buffer, int len) { + byte[] dummy = new byte[len]; + RAND.nextBytes(dummy); + buffer.put(dummy); + } + + protected byte[] generateData(int len) { + byte[] buffer = new byte[len]; + for (int i = 0; i < buffer.length; i++) { + buffer[i] = (byte) RAND.nextInt(256); + } + return buffer; + } + /** * Prepare parity chunks for encoding, each chunk for each parity unit. * @return @@ -303,4 +348,32 @@ protected ECChunk[] prepareOutputChunksForDecoding() { return chunks; } + /** + * Convert an array of this chunks to an array of byte array. + * Note the chunk buffers are not affected. + * @param chunks + * @return an array of byte array + */ + protected byte[][] toArrays(ECChunk[] chunks) { + byte[][] bytesArr = new byte[chunks.length][]; + + for (int i = 0; i < chunks.length; i++) { + bytesArr[i] = chunks[i].toBytesArray(); + } + + return bytesArr; + } + + + /** + * Make some chunk messy or not correct any more + * @param chunks + */ + protected void corruptSomeChunk(ECChunk[] chunks) { + int idx = new Random().nextInt(chunks.length); + ByteBuffer buffer = chunks[idx].getBuffer(); + if (buffer.hasRemaining()) { + buffer.position(buffer.position() + 1); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java index f30323b590..154ec18699 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java @@ -59,6 +59,19 @@ protected void testCoding(boolean usingDirectBuffer) { this.usingDirectBuffer = usingDirectBuffer; prepareCoders(); + /** + * The following runs will use 3 different chunkSize for inputs and outputs, + * to verify the same encoder/decoder can process variable width of data. + */ + performTestCoding(baseChunkSize); + performTestCoding(baseChunkSize - 17); + performTestCoding(baseChunkSize + 16); + } + + private void performTestCoding(int chunkSize) { + setChunkSize(chunkSize); + + // Generate data and encode ECBlockGroup blockGroup = prepareBlockGroupForEncoding(); // Backup all the source chunks for later recovering because some coders @@ -138,7 +151,7 @@ private ErasureCoder createEncoder() { throw new RuntimeException("Failed to create encoder", e); } - encoder.initialize(numDataUnits, numParityUnits, chunkSize); + encoder.initialize(numDataUnits, numParityUnits, getChunkSize()); encoder.setConf(getConf()); return encoder; } @@ -165,7 +178,7 @@ private ErasureCoder createDecoder() { throw new RuntimeException("Failed to create decoder", e); } - decoder.initialize(numDataUnits, numParityUnits, chunkSize); + decoder.initialize(numDataUnits, numParityUnits, getChunkSize()); decoder.setConf(getConf()); return decoder; } @@ -249,7 +262,7 @@ protected TestBlock allocateOutputBlock() { * @param blocks * @return */ - protected static TestBlock[] cloneBlocksWithData(TestBlock[] blocks) { + protected TestBlock[] cloneBlocksWithData(TestBlock[] blocks) { TestBlock[] results = new TestBlock[blocks.length]; for (int i = 0; i < blocks.length; ++i) { results[i] = cloneBlockWithData(blocks[i]); @@ -263,7 +276,7 @@ protected static TestBlock[] cloneBlocksWithData(TestBlock[] blocks) { * @param block * @return a new block */ - protected static TestBlock cloneBlockWithData(TestBlock block) { + protected TestBlock cloneBlockWithData(TestBlock block) { ECChunk[] newChunks = cloneChunksWithData(block.chunks); return new TestBlock(newChunks); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java index 84bad9231b..02b9eead3c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java @@ -44,7 +44,7 @@ public void testCodingNoDirectBuffer_10x4_erasing_d0_p0() { @Test public void testCodingDirectBuffer_10x4_erasing_p1() { - prepare(null, 10, 4, new int[] {}, new int[] {1}); + prepare(null, 10, 4, new int[0], new int[] {1}); testCoding(true); testCoding(true); } @@ -101,4 +101,14 @@ public void testCodingDirectBuffer_3x3_erasing_d0_p0() { prepare(null, 3, 3, new int[] {0}, new int[] {0}); testCoding(true); } + + @Test + public void testCodingNegative_10x4_erasing_d2_d4() { + prepare(null, 10, 4, new int[]{2, 4}, new int[0]); + + testCodingWithBadInput(true); + testCodingWithBadOutput(false); + testCodingWithBadInput(true); + testCodingWithBadOutput(false); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoderBase.java index f9e8a6baba..c06aded7b9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoderBase.java @@ -39,13 +39,11 @@ public abstract class TestRSRawCoderBase extends TestRawCoderBase { } @Override - protected ECChunk generateDataChunk() { - ByteBuffer buffer = allocateOutputBuffer(); - for (int i = 0; i < chunkSize; i++) { - buffer.put((byte) RAND.nextInt(symbolMax)); + protected byte[] generateData(int len) { + byte[] buffer = new byte[len]; + for (int i = 0; i < len; i++) { + buffer[i] = (byte) RAND.nextInt(symbolMax); } - buffer.flip(); - - return new ECChunk(buffer); + return buffer; } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java index 8543c4d84f..45823175cf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java @@ -19,6 +19,7 @@ import org.apache.hadoop.io.erasurecode.ECChunk; import org.apache.hadoop.io.erasurecode.TestCoderBase; +import org.junit.Assert; /** * Raw coder test base with utilities. @@ -41,8 +42,57 @@ protected void testCoding(boolean usingDirectBuffer) { this.usingDirectBuffer = usingDirectBuffer; prepareCoders(); + /** + * The following runs will use 3 different chunkSize for inputs and outputs, + * to verify the same encoder/decoder can process variable width of data. + */ + performTestCoding(baseChunkSize, false, false); + performTestCoding(baseChunkSize - 17, false, false); + performTestCoding(baseChunkSize + 16, false, false); + } + + /** + * Similar to above, but perform negative cases using bad input for encoding. + * @param usingDirectBuffer + */ + protected void testCodingWithBadInput(boolean usingDirectBuffer) { + this.usingDirectBuffer = usingDirectBuffer; + prepareCoders(); + + try { + performTestCoding(baseChunkSize, true, false); + Assert.fail("Encoding test with bad input should fail"); + } catch (Exception e) { + // Expected + } + } + + /** + * Similar to above, but perform negative cases using bad output for decoding. + * @param usingDirectBuffer + */ + protected void testCodingWithBadOutput(boolean usingDirectBuffer) { + this.usingDirectBuffer = usingDirectBuffer; + prepareCoders(); + + try { + performTestCoding(baseChunkSize, false, true); + Assert.fail("Decoding test with bad output should fail"); + } catch (Exception e) { + // Expected + } + } + + private void performTestCoding(int chunkSize, + boolean useBadInput, boolean useBadOutput) { + setChunkSize(chunkSize); + // Generate data and encode ECChunk[] dataChunks = prepareDataChunksForEncoding(); + if (useBadInput) { + corruptSomeChunk(dataChunks); + } + ECChunk[] parityChunks = prepareParityChunksForEncoding(); // Backup all the source chunks for later recovering because some coders @@ -59,6 +109,9 @@ protected void testCoding(boolean usingDirectBuffer) { clonedDataChunks, parityChunks); ECChunk[] recoveredChunks = prepareOutputChunksForDecoding(); + if (useBadOutput) { + corruptSomeChunk(recoveredChunks); + } decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks); @@ -88,7 +141,7 @@ protected RawErasureEncoder createEncoder() { throw new RuntimeException("Failed to create encoder", e); } - encoder.initialize(numDataUnits, numParityUnits, chunkSize); + encoder.initialize(numDataUnits, numParityUnits, getChunkSize()); encoder.setConf(getConf()); return encoder; } @@ -105,7 +158,7 @@ protected RawErasureDecoder createDecoder() { throw new RuntimeException("Failed to create decoder", e); } - decoder.initialize(numDataUnits, numParityUnits, chunkSize); + decoder.initialize(numDataUnits, numParityUnits, getChunkSize()); decoder.setConf(getConf()); return decoder; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXORRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXORRawCoder.java index b8912a9245..327174ef83 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXORRawCoder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXORRawCoder.java @@ -49,6 +49,15 @@ public void testCodingNoDirectBuffer_erasing_d0() { @Test public void testCodingDirectBuffer_erasing_p0() { prepare(null, 10, 1, new int[0], new int[] {0}); + + testCoding(true); + testCoding(true); + } + + @Test + public void testCodingDirectBuffer_erasing_d0() { + prepare(null, 10, 1, new int[] {0}, new int[0]); + testCoding(true); testCoding(true); } @@ -67,4 +76,14 @@ public void testCodingBothBuffers_erasing_d5() { testCoding(true); testCoding(false); } + + @Test + public void testCodingNegative_erasing_d5() { + prepare(null, 10, 1, new int[]{5}, new int[0]); + + testCodingWithBadInput(true); + testCodingWithBadOutput(false); + testCodingWithBadInput(true); + testCodingWithBadOutput(false); + } }