HADOOP-11847 Enhance raw coder allowing to read least required inputs in decoding. Contributed by Kai Zheng
This commit is contained in:
parent
5a391e1d25
commit
4ad484883f
@ -59,3 +59,6 @@
|
||||
|
||||
HADOOP-12029. Remove chunkSize from ECSchema as its not required for coders
|
||||
(vinayakumarb)
|
||||
|
||||
HADOOP-11847. Enhance raw coder allowing to read least required inputs in decoding.
|
||||
(Kai Zheng)
|
@ -60,12 +60,13 @@ public void release() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure output buffer filled with ZERO bytes fully in chunkSize.
|
||||
* @param buffer a buffer ready to write chunk size bytes
|
||||
* Ensure a buffer filled with ZERO bytes from current readable/writable
|
||||
* position.
|
||||
* @param buffer a buffer ready to read / write certain size bytes
|
||||
* @return the buffer itself, with ZERO bytes written, the position and limit
|
||||
* are not changed after the call
|
||||
*/
|
||||
protected ByteBuffer resetOutputBuffer(ByteBuffer buffer) {
|
||||
protected ByteBuffer resetBuffer(ByteBuffer buffer) {
|
||||
int pos = buffer.position();
|
||||
for (int i = pos; i < buffer.limit(); ++i) {
|
||||
buffer.put((byte) 0);
|
||||
@ -77,7 +78,7 @@ protected ByteBuffer resetOutputBuffer(ByteBuffer buffer) {
|
||||
|
||||
/**
|
||||
* Ensure the buffer (either input or output) ready to read or write with ZERO
|
||||
* bytes fully in chunkSize.
|
||||
* bytes fully in specified length of len.
|
||||
* @param buffer bytes array buffer
|
||||
* @return the buffer itself
|
||||
*/
|
||||
@ -92,11 +93,16 @@ protected byte[] resetBuffer(byte[] buffer, int offset, int len) {
|
||||
/**
|
||||
* Check and ensure the buffers are of the length specified by dataLen.
|
||||
* @param buffers
|
||||
* @param allowNull
|
||||
* @param dataLen
|
||||
*/
|
||||
protected void ensureLength(ByteBuffer[] buffers, int dataLen) {
|
||||
protected void ensureLength(ByteBuffer[] buffers,
|
||||
boolean allowNull, int dataLen) {
|
||||
for (int i = 0; i < buffers.length; ++i) {
|
||||
if (buffers[i].remaining() != dataLen) {
|
||||
if (buffers[i] == null && !allowNull) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Invalid buffer found, not allowing null");
|
||||
} else if (buffers[i] != null && buffers[i].remaining() != dataLen) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Invalid buffer, not of length " + dataLen);
|
||||
}
|
||||
@ -106,11 +112,16 @@ protected void ensureLength(ByteBuffer[] buffers, int dataLen) {
|
||||
/**
|
||||
* Check and ensure the buffers are of the length specified by dataLen.
|
||||
* @param buffers
|
||||
* @param allowNull
|
||||
* @param dataLen
|
||||
*/
|
||||
protected void ensureLength(byte[][] buffers, int dataLen) {
|
||||
protected void ensureLength(byte[][] buffers,
|
||||
boolean allowNull, int dataLen) {
|
||||
for (int i = 0; i < buffers.length; ++i) {
|
||||
if (buffers[i].length != dataLen) {
|
||||
if (buffers[i] == null && !allowNull) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Invalid buffer found, not allowing null");
|
||||
} else if (buffers[i] != null && buffers[i].length != dataLen) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Invalid buffer not of length " + dataLen);
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* An abstract raw erasure decoder that's to be inherited by new decoders.
|
||||
@ -38,14 +39,16 @@ public AbstractRawErasureDecoder(int numDataUnits, int numParityUnits) {
|
||||
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
ByteBuffer[] outputs) {
|
||||
checkParameters(inputs, erasedIndexes, outputs);
|
||||
int dataLen = inputs[0].remaining();
|
||||
|
||||
ByteBuffer validInput = findFirstValidInput(inputs);
|
||||
int dataLen = validInput.remaining();
|
||||
if (dataLen == 0) {
|
||||
return;
|
||||
}
|
||||
ensureLength(inputs, dataLen);
|
||||
ensureLength(outputs, dataLen);
|
||||
ensureLength(inputs, true, dataLen);
|
||||
ensureLength(outputs, false, dataLen);
|
||||
|
||||
boolean usingDirectBuffer = inputs[0].isDirect();
|
||||
boolean usingDirectBuffer = validInput.isDirect();
|
||||
if (usingDirectBuffer) {
|
||||
doDecode(inputs, erasedIndexes, outputs);
|
||||
return;
|
||||
@ -59,9 +62,11 @@ public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
ByteBuffer buffer;
|
||||
for (int i = 0; i < inputs.length; ++i) {
|
||||
buffer = inputs[i];
|
||||
if (buffer != null) {
|
||||
inputOffsets[i] = buffer.position();
|
||||
newInputs[i] = buffer.array();
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < outputs.length; ++i) {
|
||||
buffer = outputs[i];
|
||||
@ -74,7 +79,10 @@ public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
|
||||
for (int i = 0; i < inputs.length; ++i) {
|
||||
buffer = inputs[i];
|
||||
buffer.position(inputOffsets[i] + dataLen); // dataLen bytes consumed
|
||||
if (buffer != null) {
|
||||
// dataLen bytes consumed
|
||||
buffer.position(inputOffsets[i] + dataLen);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,12 +98,14 @@ protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
@Override
|
||||
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
|
||||
checkParameters(inputs, erasedIndexes, outputs);
|
||||
int dataLen = inputs[0].length;
|
||||
|
||||
byte[] validInput = findFirstValidInput(inputs);
|
||||
int dataLen = validInput.length;
|
||||
if (dataLen == 0) {
|
||||
return;
|
||||
}
|
||||
ensureLength(inputs, dataLen);
|
||||
ensureLength(outputs, dataLen);
|
||||
ensureLength(inputs, true, dataLen);
|
||||
ensureLength(outputs, false, dataLen);
|
||||
|
||||
int[] inputOffsets = new int[inputs.length]; // ALL ZERO
|
||||
int[] outputOffsets = new int[outputs.length]; // ALL ZERO
|
||||
@ -148,5 +158,50 @@ protected void checkParameters(Object[] inputs, int[] erasedIndexes,
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Too many erased, not recoverable");
|
||||
}
|
||||
|
||||
int validInputs = 0;
|
||||
for (int i = 0; i < inputs.length; ++i) {
|
||||
if (inputs[i] != null) {
|
||||
validInputs += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (validInputs < getNumDataUnits()) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"No enough valid inputs are provided, not recoverable");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get indexes into inputs array for items marked as null, either erased or
|
||||
* not to read.
|
||||
* @return indexes into inputs array
|
||||
*/
|
||||
protected int[] getErasedOrNotToReadIndexes(Object[] inputs) {
|
||||
int[] invalidIndexes = new int[inputs.length];
|
||||
int idx = 0;
|
||||
for (int i = 0; i < inputs.length; i++) {
|
||||
if (inputs[i] == null) {
|
||||
invalidIndexes[idx++] = i;
|
||||
}
|
||||
}
|
||||
|
||||
return Arrays.copyOf(invalidIndexes, idx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the valid input from all the inputs.
|
||||
* @param inputs
|
||||
* @return the first valid input
|
||||
*/
|
||||
protected static <T> T findFirstValidInput(T[] inputs) {
|
||||
for (int i = 0; i < inputs.length; i++) {
|
||||
if (inputs[i] != null) {
|
||||
return inputs[i];
|
||||
}
|
||||
}
|
||||
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Invalid inputs are found, all being null");
|
||||
}
|
||||
}
|
||||
|
@ -41,8 +41,8 @@ public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
|
||||
if (dataLen == 0) {
|
||||
return;
|
||||
}
|
||||
ensureLength(inputs, dataLen);
|
||||
ensureLength(outputs, dataLen);
|
||||
ensureLength(inputs, false, dataLen);
|
||||
ensureLength(outputs, false, dataLen);
|
||||
|
||||
boolean usingDirectBuffer = inputs[0].isDirect();
|
||||
if (usingDirectBuffer) {
|
||||
@ -90,8 +90,8 @@ public void encode(byte[][] inputs, byte[][] outputs) {
|
||||
if (dataLen == 0) {
|
||||
return;
|
||||
}
|
||||
ensureLength(inputs, dataLen);
|
||||
ensureLength(outputs, dataLen);
|
||||
ensureLength(inputs, false, dataLen);
|
||||
ensureLength(outputs, false, dataLen);
|
||||
|
||||
int[] inputOffsets = new int[inputs.length]; // ALL ZERO
|
||||
int[] outputOffsets = new int[outputs.length]; // ALL ZERO
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.io.erasurecode.rawcoder;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
@ -25,35 +26,64 @@
|
||||
* A raw erasure decoder in RS code scheme in pure Java in case native one
|
||||
* isn't available in some environment. Please always use native implementations
|
||||
* when possible.
|
||||
*
|
||||
* Currently this implementation will compute and decode not to read units
|
||||
* unnecessarily due to the underlying implementation limit in GF. This will be
|
||||
* addressed in HADOOP-11871.
|
||||
*/
|
||||
public class RSRawDecoder extends AbstractRawErasureDecoder {
|
||||
// To describe and calculate the needed Vandermonde matrix
|
||||
private int[] errSignature;
|
||||
private int[] primitivePower;
|
||||
|
||||
/**
|
||||
* We need a set of reusable buffers either for the bytes array
|
||||
* decoding version or direct buffer decoding version. Normally not both.
|
||||
*
|
||||
* For output, in addition to the valid buffers from the caller
|
||||
* passed from above, we need to provide extra buffers for the internal
|
||||
* decoding implementation. For output, the caller should provide no more
|
||||
* than numParityUnits but at least one buffers. And the left buffers will be
|
||||
* borrowed from either bytesArrayBuffers, for the bytes array version.
|
||||
*
|
||||
*/
|
||||
// Reused buffers for decoding with bytes arrays
|
||||
private byte[][] bytesArrayBuffers = new byte[getNumParityUnits()][];
|
||||
private byte[][] adjustedByteArrayOutputsParameter =
|
||||
new byte[getNumParityUnits()][];
|
||||
private int[] adjustedOutputOffsets = new int[getNumParityUnits()];
|
||||
|
||||
// Reused buffers for decoding with direct ByteBuffers
|
||||
private ByteBuffer[] directBuffers = new ByteBuffer[getNumParityUnits()];
|
||||
private ByteBuffer[] adjustedDirectBufferOutputsParameter =
|
||||
new ByteBuffer[getNumParityUnits()];
|
||||
|
||||
public RSRawDecoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize());
|
||||
if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Invalid numDataUnits and numParityUnits");
|
||||
}
|
||||
|
||||
this.errSignature = new int[numParityUnits];
|
||||
this.primitivePower = RSUtil.getPrimitivePower(numDataUnits,
|
||||
numParityUnits);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
private void doDecodeImpl(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
ByteBuffer[] outputs) {
|
||||
ByteBuffer valid = findFirstValidInput(inputs);
|
||||
int dataLen = valid.remaining();
|
||||
for (int i = 0; i < erasedIndexes.length; i++) {
|
||||
errSignature[i] = primitivePower[erasedIndexes[i]];
|
||||
RSUtil.GF.substitute(inputs, outputs[i], primitivePower[i]);
|
||||
RSUtil.GF.substitute(inputs, dataLen, outputs[i], primitivePower[i]);
|
||||
}
|
||||
|
||||
RSUtil.GF.solveVandermondeSystem(errSignature,
|
||||
outputs, erasedIndexes.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDecode(byte[][] inputs, int[] inputOffsets,
|
||||
private void doDecodeImpl(byte[][] inputs, int[] inputOffsets,
|
||||
int dataLen, int[] erasedIndexes,
|
||||
byte[][] outputs, int[] outputOffsets) {
|
||||
for (int i = 0; i < erasedIndexes.length; i++) {
|
||||
@ -65,4 +95,122 @@ protected void doDecode(byte[][] inputs, int[] inputOffsets,
|
||||
RSUtil.GF.solveVandermondeSystem(errSignature, outputs, outputOffsets,
|
||||
erasedIndexes.length, dataLen);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDecode(byte[][] inputs, int[] inputOffsets,
|
||||
int dataLen, int[] erasedIndexes,
|
||||
byte[][] outputs, int[] outputOffsets) {
|
||||
/**
|
||||
* As passed parameters are friendly to callers but not to the underlying
|
||||
* implementations, so we have to adjust them before calling doDecodeImpl.
|
||||
*/
|
||||
|
||||
int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
|
||||
|
||||
// Prepare for adjustedOutputsParameter
|
||||
|
||||
// First reset the positions needed this time
|
||||
for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
|
||||
adjustedByteArrayOutputsParameter[i] = null;
|
||||
adjustedOutputOffsets[i] = 0;
|
||||
}
|
||||
// Use the caller passed buffers in erasedIndexes positions
|
||||
for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) {
|
||||
boolean found = false;
|
||||
for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
|
||||
// If this index is one requested by the caller via erasedIndexes, then
|
||||
// we use the passed output buffer to avoid copying data thereafter.
|
||||
if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
|
||||
found = true;
|
||||
adjustedByteArrayOutputsParameter[j] = resetBuffer(
|
||||
outputs[outputIdx], outputOffsets[outputIdx], dataLen);
|
||||
adjustedOutputOffsets[j] = outputOffsets[outputIdx];
|
||||
outputIdx++;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Inputs not fully corresponding to erasedIndexes in null places");
|
||||
}
|
||||
}
|
||||
// Use shared buffers for other positions (not set yet)
|
||||
for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) {
|
||||
if (adjustedByteArrayOutputsParameter[i] == null) {
|
||||
adjustedByteArrayOutputsParameter[i] = resetBuffer(
|
||||
checkGetBytesArrayBuffer(bufferIdx, dataLen), 0, dataLen);
|
||||
adjustedOutputOffsets[i] = 0; // Always 0 for such temp output
|
||||
bufferIdx++;
|
||||
}
|
||||
}
|
||||
|
||||
doDecodeImpl(inputs, inputOffsets, dataLen, erasedOrNotToReadIndexes,
|
||||
adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
ByteBuffer[] outputs) {
|
||||
ByteBuffer validInput = findFirstValidInput(inputs);
|
||||
int dataLen = validInput.remaining();
|
||||
|
||||
/**
|
||||
* As passed parameters are friendly to callers but not to the underlying
|
||||
* implementations, so we have to adjust them before calling doDecodeImpl.
|
||||
*/
|
||||
|
||||
int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
|
||||
|
||||
// Prepare for adjustedDirectBufferOutputsParameter
|
||||
|
||||
// First reset the positions needed this time
|
||||
for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
|
||||
adjustedDirectBufferOutputsParameter[i] = null;
|
||||
}
|
||||
// Use the caller passed buffers in erasedIndexes positions
|
||||
for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) {
|
||||
boolean found = false;
|
||||
for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
|
||||
// If this index is one requested by the caller via erasedIndexes, then
|
||||
// we use the passed output buffer to avoid copying data thereafter.
|
||||
if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
|
||||
found = true;
|
||||
adjustedDirectBufferOutputsParameter[j] =
|
||||
resetBuffer(outputs[outputIdx++]);
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Inputs not fully corresponding to erasedIndexes in null places");
|
||||
}
|
||||
}
|
||||
// Use shared buffers for other positions (not set yet)
|
||||
for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) {
|
||||
if (adjustedDirectBufferOutputsParameter[i] == null) {
|
||||
ByteBuffer buffer = checkGetDirectBuffer(bufferIdx, dataLen);
|
||||
buffer.position(0);
|
||||
buffer.limit(dataLen);
|
||||
adjustedDirectBufferOutputsParameter[i] = resetBuffer(buffer);
|
||||
bufferIdx++;
|
||||
}
|
||||
}
|
||||
|
||||
doDecodeImpl(inputs, erasedOrNotToReadIndexes,
|
||||
adjustedDirectBufferOutputsParameter);
|
||||
}
|
||||
|
||||
private byte[] checkGetBytesArrayBuffer(int idx, int bufferLen) {
|
||||
if (bytesArrayBuffers[idx] == null ||
|
||||
bytesArrayBuffers[idx].length < bufferLen) {
|
||||
bytesArrayBuffers[idx] = new byte[bufferLen];
|
||||
}
|
||||
return bytesArrayBuffers[idx];
|
||||
}
|
||||
|
||||
private ByteBuffer checkGetDirectBuffer(int idx, int bufferLen) {
|
||||
if (directBuffers[idx] == null ||
|
||||
directBuffers[idx].capacity() < bufferLen) {
|
||||
directBuffers[idx] = ByteBuffer.allocateDirect(bufferLen);
|
||||
}
|
||||
return directBuffers[idx];
|
||||
}
|
||||
}
|
||||
|
@ -32,6 +32,22 @@ public interface RawErasureDecoder extends RawErasureCoder {
|
||||
|
||||
/**
|
||||
* Decode with inputs and erasedIndexes, generates outputs.
|
||||
* How to prepare for inputs:
|
||||
* 1. Create an array containing parity units + data units;
|
||||
* 2. Set null in the array locations specified via erasedIndexes to indicate
|
||||
* they're erased and no data are to read from;
|
||||
* 3. Set null in the array locations for extra redundant items, as they're
|
||||
* not necessary to read when decoding. For example in RS-6-3, if only 1
|
||||
* unit is really erased, then we have 2 extra items as redundant. They can
|
||||
* be set as null to indicate no data will be used from them.
|
||||
*
|
||||
* For an example using RS (6, 3), assuming sources (d0, d1, d2, d3, d4, d5)
|
||||
* and parities (p0, p1, p2), d2 being erased. We can and may want to use only
|
||||
* 6 units like (d1, d3, d4, d5, p0, p2) to recover d2. We will have:
|
||||
* inputs = [p0, null(p1), p2, null(d0), d1, null(d2), d3, d4, d5]
|
||||
* erasedIndexes = [5] // index of d2 into inputs array
|
||||
* outputs = [a-writable-buffer]
|
||||
*
|
||||
* @param inputs inputs to read data from
|
||||
* @param erasedIndexes indexes of erased units in the inputs array
|
||||
* @param outputs outputs to write into for data generated according to
|
||||
@ -41,7 +57,7 @@ public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
ByteBuffer[] outputs);
|
||||
|
||||
/**
|
||||
* Decode with inputs and erasedIndexes, generates outputs.
|
||||
* Decode with inputs and erasedIndexes, generates outputs. More see above.
|
||||
* @param inputs inputs to read data from
|
||||
* @param erasedIndexes indexes of erased units in the inputs array
|
||||
* @param outputs outputs to write into for data generated according to
|
||||
@ -50,7 +66,7 @@ public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs);
|
||||
|
||||
/**
|
||||
* Decode with inputs and erasedIndexes, generates outputs.
|
||||
* Decode with inputs and erasedIndexes, generates outputs. More see above.
|
||||
* @param inputs inputs to read data from
|
||||
* @param erasedIndexes indexes of erased units in the inputs array
|
||||
* @param outputs outputs to write into for data generated according to
|
||||
|
@ -36,7 +36,7 @@ public XORRawDecoder(int numDataUnits, int numParityUnits) {
|
||||
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
ByteBuffer[] outputs) {
|
||||
ByteBuffer output = outputs[0];
|
||||
resetOutputBuffer(output);
|
||||
resetBuffer(output);
|
||||
|
||||
int erasedIdx = erasedIndexes[0];
|
||||
|
||||
|
@ -34,7 +34,7 @@ public XORRawEncoder(int numDataUnits, int numParityUnits) {
|
||||
|
||||
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
|
||||
ByteBuffer output = outputs[0];
|
||||
resetOutputBuffer(output);
|
||||
resetBuffer(output);
|
||||
|
||||
// Get the first buffer's data.
|
||||
int iIdx, oIdx;
|
||||
|
@ -423,7 +423,7 @@ public void substitute(byte[][] p, int[] offsets,
|
||||
byte[] pi = p[i];
|
||||
for (iIdx = offsets[i], oIdx = offset;
|
||||
iIdx < offsets[i] + len; iIdx++, oIdx++) {
|
||||
int pij = pi[iIdx] & 0x000000FF;
|
||||
int pij = pi != null ? pi[iIdx] & 0x000000FF : 0;
|
||||
q[oIdx] = (byte) (q[oIdx] ^ mulTable[pij][y]);
|
||||
}
|
||||
y = mulTable[x][y];
|
||||
@ -438,13 +438,15 @@ public void substitute(byte[][] p, int[] offsets,
|
||||
* @param q store the return result
|
||||
* @param x input field
|
||||
*/
|
||||
public void substitute(ByteBuffer[] p, ByteBuffer q, int x) {
|
||||
public void substitute(ByteBuffer[] p, int len, ByteBuffer q, int x) {
|
||||
int y = 1, iIdx, oIdx;
|
||||
for (int i = 0; i < p.length; i++) {
|
||||
ByteBuffer pi = p[i];
|
||||
for (iIdx = pi.position(), oIdx = q.position();
|
||||
iIdx < pi.limit(); iIdx++, oIdx++) {
|
||||
int pij = pi.get(iIdx) & 0x000000FF;
|
||||
int pos = pi != null ? pi.position() : 0;
|
||||
int limit = pi != null ? pi.limit() : len;
|
||||
for (oIdx = q.position(), iIdx = pos;
|
||||
iIdx < limit; iIdx++, oIdx++) {
|
||||
int pij = pi != null ? pi.get(iIdx) & 0x000000FF : 0;
|
||||
q.put(oIdx, (byte) (q.get(oIdx) ^ mulTable[pij][y]));
|
||||
}
|
||||
y = mulTable[x][y];
|
||||
|
@ -35,7 +35,7 @@ public abstract class TestCoderBase {
|
||||
private Configuration conf;
|
||||
protected int numDataUnits;
|
||||
protected int numParityUnits;
|
||||
protected int baseChunkSize = 16 * 1024;
|
||||
protected int baseChunkSize = 513;
|
||||
private int chunkSize = baseChunkSize;
|
||||
|
||||
private byte[] zeroChunkBytes;
|
||||
@ -186,8 +186,9 @@ protected ECChunk[] prepareInputChunksForDecoding(ECChunk[] dataChunks,
|
||||
}
|
||||
|
||||
/**
|
||||
* Erase chunks to test the recovering of them. Before erasure clone them
|
||||
* first so could return them.
|
||||
* Erase some data chunks to test the recovering of them. As they're erased,
|
||||
* we don't need to read them and will not have the buffers at all, so just
|
||||
* set them as null.
|
||||
* @param dataChunks
|
||||
* @param parityChunks
|
||||
* @return clone of erased chunks
|
||||
@ -198,50 +199,30 @@ protected ECChunk[] backupAndEraseChunks(ECChunk[] dataChunks,
|
||||
erasedDataIndexes.length];
|
||||
|
||||
int idx = 0;
|
||||
ECChunk chunk;
|
||||
|
||||
for (int i = 0; i < erasedParityIndexes.length; i++) {
|
||||
chunk = parityChunks[erasedParityIndexes[i]];
|
||||
toEraseChunks[idx ++] = cloneChunkWithData(chunk);
|
||||
eraseDataFromChunk(chunk);
|
||||
toEraseChunks[idx ++] = parityChunks[erasedParityIndexes[i]];
|
||||
parityChunks[erasedParityIndexes[i]] = null;
|
||||
}
|
||||
|
||||
for (int i = 0; i < erasedDataIndexes.length; i++) {
|
||||
chunk = dataChunks[erasedDataIndexes[i]];
|
||||
toEraseChunks[idx ++] = cloneChunkWithData(chunk);
|
||||
eraseDataFromChunk(chunk);
|
||||
toEraseChunks[idx ++] = dataChunks[erasedDataIndexes[i]];
|
||||
dataChunks[erasedDataIndexes[i]] = null;
|
||||
}
|
||||
|
||||
return toEraseChunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Erase data from the specified chunks, putting ZERO bytes to the buffers.
|
||||
* Erase data from the specified chunks, just setting them as null.
|
||||
* @param chunks
|
||||
*/
|
||||
protected void eraseDataFromChunks(ECChunk[] chunks) {
|
||||
for (int i = 0; i < chunks.length; i++) {
|
||||
eraseDataFromChunk(chunks[i]);
|
||||
chunks[i] = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Erase data from the specified chunk, putting ZERO bytes to the buffer.
|
||||
* @param chunk with a buffer ready to read at the current position
|
||||
*/
|
||||
protected void eraseDataFromChunk(ECChunk chunk) {
|
||||
ByteBuffer chunkBuffer = chunk.getBuffer();
|
||||
// Erase the data at the position, and restore the buffer ready for reading
|
||||
// same many bytes but all ZERO.
|
||||
int pos = chunkBuffer.position();
|
||||
int len = chunkBuffer.remaining();
|
||||
chunkBuffer.put(zeroChunkBytes, 0, len);
|
||||
// Back to readable again after data erased
|
||||
chunkBuffer.flip();
|
||||
chunkBuffer.position(pos);
|
||||
chunkBuffer.limit(pos + len);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clone chunks along with copying the associated data. It respects how the
|
||||
* chunk buffer is allocated, direct or non-direct. It avoids affecting the
|
||||
|
@ -232,7 +232,6 @@ protected TestBlock[] backupAndEraseBlocks(TestBlock[] dataBlocks,
|
||||
TestBlock[] parityBlocks) {
|
||||
TestBlock[] toEraseBlocks = new TestBlock[erasedDataIndexes.length +
|
||||
erasedParityIndexes.length];
|
||||
|
||||
int idx = 0;
|
||||
TestBlock block;
|
||||
|
||||
|
@ -32,89 +32,86 @@ public void setup() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingNoDirectBuffer_10x4_erasing_d0_p0() {
|
||||
public void testCoding_6x3_erasing_all_d() {
|
||||
prepare(null, 6, 3, new int[]{0, 1, 2}, new int[0], true);
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasing_d0_d2() {
|
||||
prepare(null, 6, 3, new int[] {0, 2}, new int[]{});
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasing_d0() {
|
||||
prepare(null, 6, 3, new int[]{0}, new int[0]);
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasing_d2() {
|
||||
prepare(null, 6, 3, new int[]{2}, new int[]{});
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasing_d0_p0() {
|
||||
prepare(null, 6, 3, new int[]{0}, new int[]{0});
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasing_all_p() {
|
||||
prepare(null, 6, 3, new int[0], new int[]{0, 1, 2});
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasing_p0() {
|
||||
prepare(null, 6, 3, new int[0], new int[]{0});
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasing_p2() {
|
||||
prepare(null, 6, 3, new int[0], new int[]{2});
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasure_p0_p2() {
|
||||
prepare(null, 6, 3, new int[0], new int[]{0, 2});
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasing_d0_p0_p1() {
|
||||
prepare(null, 6, 3, new int[]{0}, new int[]{0, 1});
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_6x3_erasing_d0_d2_p2() {
|
||||
prepare(null, 6, 3, new int[]{0, 2}, new int[]{2});
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingNegative_6x3_erasing_d2_d4() {
|
||||
prepare(null, 6, 3, new int[]{2, 4}, new int[0]);
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingNegative_6x3_erasing_too_many() {
|
||||
prepare(null, 6, 3, new int[]{2, 4}, new int[]{0, 1});
|
||||
testCodingWithErasingTooMany();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoding_10x4_erasing_d0_p0() {
|
||||
prepare(null, 10, 4, new int[] {0}, new int[] {0});
|
||||
/**
|
||||
* Doing twice to test if the coders can be repeatedly reused. This matters
|
||||
* as the underlying coding buffers are shared, which may have bugs.
|
||||
*/
|
||||
testCoding(false);
|
||||
testCoding(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingDirectBuffer_10x4_erasing_p1() {
|
||||
prepare(null, 10, 4, new int[0], new int[] {1});
|
||||
testCoding(true);
|
||||
testCoding(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingDirectBuffer_10x4_erasing_d2() {
|
||||
prepare(null, 10, 4, new int[] {2}, new int[] {});
|
||||
testCoding(true);
|
||||
testCoding(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingDirectBuffer_10x4_erasing_d0_p0() {
|
||||
prepare(null, 10, 4, new int[] {0}, new int[] {0});
|
||||
testCoding(true);
|
||||
testCoding(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingBothBuffers_10x4_erasing_d0_p0() {
|
||||
prepare(null, 10, 4, new int[] {0}, new int[] {0});
|
||||
|
||||
/**
|
||||
* Doing in mixed buffer usage model to test if the coders can be repeatedly
|
||||
* reused with different buffer usage model. This matters as the underlying
|
||||
* coding buffers are shared, which may have bugs.
|
||||
*/
|
||||
testCoding(true);
|
||||
testCoding(false);
|
||||
testCoding(true);
|
||||
testCoding(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingDirectBuffer_10x4_erasure_of_d2_d4_p0() {
|
||||
prepare(null, 10, 4, new int[]{2, 4}, new int[]{0});
|
||||
testCoding(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingDirectBuffer_usingFixedData_10x4_erasure_of_d2_d4_p0() {
|
||||
prepare(null, 10, 4, new int[] {2, 4}, new int[] {0}, true);
|
||||
testCoding(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingDirectBuffer_10x4_erasing_d0_d1_p0_p1() {
|
||||
prepare(null, 10, 4, new int[] {0, 1}, new int[] {0, 1});
|
||||
testCoding(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingNoDirectBuffer_3x3_erasing_d0_p0() {
|
||||
prepare(null, 3, 3, new int[] {0}, new int[] {0});
|
||||
testCoding(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingDirectBuffer_3x3_erasing_d0_p0() {
|
||||
prepare(null, 3, 3, new int[] {0}, new int[] {0});
|
||||
testCoding(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingNegative_10x4_erasing_d2_d4() {
|
||||
prepare(null, 10, 4, new int[]{2, 4}, new int[0]);
|
||||
|
||||
testCodingWithBadInput(true);
|
||||
testCodingWithBadOutput(false);
|
||||
testCodingWithBadInput(true);
|
||||
testCodingWithBadOutput(false);
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||
import org.apache.hadoop.io.erasurecode.TestCoderBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
|
||||
@ -32,6 +33,25 @@ public abstract class TestRawCoderBase extends TestCoderBase {
|
||||
private RawErasureEncoder encoder;
|
||||
private RawErasureDecoder decoder;
|
||||
|
||||
/**
|
||||
* Doing twice to test if the coders can be repeatedly reused. This matters
|
||||
* as the underlying coding buffers are shared, which may have bugs.
|
||||
*/
|
||||
protected void testCodingDoMixAndTwice() {
|
||||
testCodingDoMixed();
|
||||
testCodingDoMixed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Doing in mixed buffer usage model to test if the coders can be repeatedly
|
||||
* reused with different buffer usage model. This matters as the underlying
|
||||
* coding buffers are shared, which may have bugs.
|
||||
*/
|
||||
protected void testCodingDoMixed() {
|
||||
testCoding(true);
|
||||
testCoding(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generating source data, encoding, recovering and then verifying.
|
||||
* RawErasureCoder mainly uses ECChunk to pass input and output data buffers,
|
||||
@ -85,6 +105,23 @@ protected void testCodingWithBadOutput(boolean usingDirectBuffer) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingWithErasingTooMany() {
|
||||
try {
|
||||
testCoding(true);
|
||||
Assert.fail("Decoding test erasing too many should fail");
|
||||
} catch (Exception e) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
try {
|
||||
testCoding(false);
|
||||
Assert.fail("Decoding test erasing too many should fail");
|
||||
} catch (Exception e) {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
private void performTestCoding(int chunkSize,
|
||||
boolean useBadInput, boolean useBadOutput) {
|
||||
setChunkSize(chunkSize);
|
||||
@ -110,6 +147,9 @@ private void performTestCoding(int chunkSize,
|
||||
ECChunk[] inputChunks = prepareInputChunksForDecoding(
|
||||
clonedDataChunks, parityChunks);
|
||||
|
||||
// Remove unnecessary chunks, allowing only least required chunks to be read.
|
||||
ensureOnlyLeastRequiredChunks(inputChunks);
|
||||
|
||||
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
|
||||
if (useBadOutput) {
|
||||
corruptSomeChunk(recoveredChunks);
|
||||
@ -131,6 +171,20 @@ private void prepareCoders() {
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureOnlyLeastRequiredChunks(ECChunk[] inputChunks) {
|
||||
int leastRequiredNum = numDataUnits;
|
||||
int erasedNum = erasedDataIndexes.length + erasedParityIndexes.length;
|
||||
int goodNum = inputChunks.length - erasedNum;
|
||||
int redundantNum = goodNum - leastRequiredNum;
|
||||
|
||||
for (int i = 0; i < inputChunks.length && redundantNum > 0; i++) {
|
||||
if (inputChunks[i] != null) {
|
||||
inputChunks[i] = null; // Setting it null, not needing it actually
|
||||
redundantNum--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the raw erasure encoder to test
|
||||
* @return
|
||||
|
@ -29,58 +29,35 @@ public class TestXORRawCoder extends TestRawCoderBase {
|
||||
public void setup() {
|
||||
this.encoderClass = XORRawEncoder.class;
|
||||
this.decoderClass = XORRawDecoder.class;
|
||||
|
||||
this.numDataUnits = 10;
|
||||
this.numParityUnits = 1;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingNoDirectBuffer_erasing_d0() {
|
||||
public void testCoding_10x1_erasing_d0() {
|
||||
prepare(null, 10, 1, new int[] {0}, new int[0]);
|
||||
|
||||
/**
|
||||
* Doing twice to test if the coders can be repeatedly reused. This matters
|
||||
* as the underlying coding buffers are shared, which may have bugs.
|
||||
*/
|
||||
testCoding(false);
|
||||
testCoding(false);
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingDirectBuffer_erasing_p0() {
|
||||
public void testCoding_10x1_erasing_p0() {
|
||||
prepare(null, 10, 1, new int[0], new int[] {0});
|
||||
|
||||
testCoding(true);
|
||||
testCoding(true);
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingDirectBuffer_erasing_d0() {
|
||||
prepare(null, 10, 1, new int[] {0}, new int[0]);
|
||||
|
||||
testCoding(true);
|
||||
testCoding(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingBothBuffers_erasing_d5() {
|
||||
public void testCoding_10x1_erasing_d5() {
|
||||
prepare(null, 10, 1, new int[]{5}, new int[0]);
|
||||
|
||||
/**
|
||||
* Doing in mixed buffer usage model to test if the coders can be repeatedly
|
||||
* reused with different buffer usage model. This matters as the underlying
|
||||
* coding buffers are shared, which may have bugs.
|
||||
*/
|
||||
testCoding(true);
|
||||
testCoding(false);
|
||||
testCoding(true);
|
||||
testCoding(false);
|
||||
testCodingDoMixAndTwice();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingNegative_erasing_d5() {
|
||||
prepare(null, 10, 1, new int[]{5}, new int[0]);
|
||||
public void testCodingNegative_10x1_erasing_too_many() {
|
||||
prepare(null, 10, 1, new int[]{2}, new int[]{0});
|
||||
testCodingWithErasingTooMany();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodingNegative_10x1_erasing_d5() {
|
||||
prepare(null, 10, 1, new int[]{5}, new int[0]);
|
||||
testCodingWithBadInput(true);
|
||||
testCodingWithBadOutput(false);
|
||||
testCodingWithBadInput(true);
|
||||
|
Loading…
Reference in New Issue
Block a user