HADOOP-11938. Enhance ByteBuffer version encode/decode API of raw erasure coder. Contributed by Kai Zheng.

This commit is contained in:
Zhe Zhang 2015-05-18 10:14:54 -07:00 committed by Zhe Zhang
parent b64f6745a4
commit 343c0e76fc
16 changed files with 538 additions and 215 deletions

View File

@ -51,3 +51,6 @@
HADOOP-11566. Add tests and fix for erasure coders to recover erased parity
units. (Kai Zheng via Zhe Zhang)
HADOOP-11938. Enhance ByteBuffer version encode/decode API of raw erasure
coder. (Kai Zheng via Zhe Zhang)

View File

@ -72,34 +72,15 @@ public static ByteBuffer[] toBuffers(ECChunk[] chunks) {
}
/**
* Convert an array of this chunks to an array of byte array.
* Note the chunk buffers are not affected.
* @param chunks
* @return an array of byte array
* Convert to a bytes array, just for test usage.
* @return bytes array
*/
public static byte[][] toArrays(ECChunk[] chunks) {
byte[][] bytesArr = new byte[chunks.length][];
ByteBuffer buffer;
ECChunk chunk;
for (int i = 0; i < chunks.length; i++) {
chunk = chunks[i];
if (chunk == null) {
bytesArr[i] = null;
continue;
}
buffer = chunk.getBuffer();
if (buffer.hasArray()) {
bytesArr[i] = buffer.array();
} else {
bytesArr[i] = new byte[buffer.remaining()];
// Avoid affecting the original one
buffer.mark();
buffer.get(bytesArr[i]);
buffer.reset();
}
}
public byte[] toBytesArray() {
byte[] bytesArr = new byte[chunkBuffer.remaining()];
// Avoid affecting the original one
chunkBuffer.mark();
chunkBuffer.get(bytesArr);
chunkBuffer.reset();
return bytesArr;
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configured;
import java.nio.ByteBuffer;
@ -30,9 +31,6 @@
public abstract class AbstractRawErasureCoder
extends Configured implements RawErasureCoder {
// Hope to reset coding buffers a little faster using it
private byte[] zeroChunkBytes;
private int numDataUnits;
private int numParityUnits;
private int chunkSize;
@ -43,8 +41,6 @@ public void initialize(int numDataUnits, int numParityUnits,
this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits;
this.chunkSize = chunkSize;
zeroChunkBytes = new byte[chunkSize]; // With ZERO by default
}
@Override
@ -73,42 +69,17 @@ public void release() {
}
/**
* Convert an array of heap ByteBuffers to an array of byte array.
* @param buffers
* @return an array of byte array
* Ensure output buffer filled with ZERO bytes fully in chunkSize.
* @param buffer a buffer ready to write chunk size bytes
* @return the buffer itself, with ZERO bytes written, the position and limit
* are not changed after the call
*/
protected static byte[][] toArrays(ByteBuffer[] buffers) {
byte[][] bytesArr = new byte[buffers.length][];
ByteBuffer buffer;
for (int i = 0; i < buffers.length; i++) {
buffer = buffers[i];
if (buffer == null) {
bytesArr[i] = null;
continue;
}
if (buffer.hasArray()) {
bytesArr[i] = buffer.array();
} else {
throw new IllegalArgumentException("Invalid ByteBuffer passed, " +
"expecting heap buffer");
}
protected ByteBuffer resetOutputBuffer(ByteBuffer buffer) {
int pos = buffer.position();
for (int i = pos; i < buffer.limit(); ++i) {
buffer.put((byte) 0);
}
return bytesArr;
}
/**
* Ensure the buffer (either input or output) ready to read or write with ZERO
* bytes fully in chunkSize.
* @param buffer
* @return the buffer itself
*/
protected ByteBuffer resetBuffer(ByteBuffer buffer) {
buffer.clear();
buffer.put(zeroChunkBytes);
buffer.position(0);
buffer.position(pos);
return buffer;
}
@ -119,9 +90,39 @@ protected ByteBuffer resetBuffer(ByteBuffer buffer) {
* @param buffer bytes array buffer
* @return the buffer itself
*/
protected byte[] resetBuffer(byte[] buffer) {
System.arraycopy(zeroChunkBytes, 0, buffer, 0, buffer.length);
protected byte[] resetBuffer(byte[] buffer, int offset, int len) {
for (int i = offset; i < len; ++i) {
buffer[i] = (byte) 0;
}
return buffer;
}
/**
* Check and ensure the buffers are of the length specified by dataLen.
* @param buffers
* @param dataLen
*/
protected void ensureLength(ByteBuffer[] buffers, int dataLen) {
for (int i = 0; i < buffers.length; ++i) {
if (buffers[i].remaining() != dataLen) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, not of length " + dataLen);
}
}
}
/**
* Check and ensure the buffers are of the length specified by dataLen.
* @param buffers
* @param dataLen
*/
protected void ensureLength(byte[][] buffers, int dataLen) {
for (int i = 0; i < buffers.length; ++i) {
if (buffers[i].length != dataLen) {
throw new HadoopIllegalArgumentException(
"Invalid buffer not of length " + dataLen);
}
}
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.io.erasurecode.ECChunk;
import java.nio.ByteBuffer;
@ -33,14 +34,43 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
checkParameters(inputs, erasedIndexes, outputs);
int dataLen = inputs[0].remaining();
if (dataLen == 0) {
return;
}
ensureLength(inputs, dataLen);
ensureLength(outputs, dataLen);
boolean hasArray = inputs[0].hasArray();
if (hasArray) {
byte[][] newInputs = toArrays(inputs);
byte[][] newOutputs = toArrays(outputs);
doDecode(newInputs, erasedIndexes, newOutputs);
} else {
boolean usingDirectBuffer = inputs[0].isDirect();
if (usingDirectBuffer) {
doDecode(inputs, erasedIndexes, outputs);
return;
}
int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
inputOffsets[i] = buffer.position();
newInputs[i] = buffer.array();
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.position();
newOutputs[i] = buffer.array();
}
doDecode(newInputs, inputOffsets, dataLen,
erasedIndexes, newOutputs, outputOffsets);
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
buffer.position(inputOffsets[i] + dataLen); // dataLen bytes consumed
}
}
@ -56,18 +86,33 @@ protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
@Override
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
checkParameters(inputs, erasedIndexes, outputs);
int dataLen = inputs[0].length;
if (dataLen == 0) {
return;
}
ensureLength(inputs, dataLen);
ensureLength(outputs, dataLen);
doDecode(inputs, erasedIndexes, outputs);
int[] inputOffsets = new int[inputs.length]; // ALL ZERO
int[] outputOffsets = new int[outputs.length]; // ALL ZERO
doDecode(inputs, inputOffsets, dataLen, erasedIndexes, outputs,
outputOffsets);
}
/**
* Perform the real decoding using bytes array
* Perform the real decoding using bytes array, supporting offsets and
* lengths.
* @param inputs
* @param inputOffsets
* @param dataLen
* @param erasedIndexes
* @param outputs
* @param outputOffsets
*/
protected abstract void doDecode(byte[][] inputs, int[] erasedIndexes,
byte[][] outputs);
protected abstract void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets);
@Override
public void decode(ECChunk[] inputs, int[] erasedIndexes,
@ -91,12 +136,12 @@ protected void checkParameters(Object[] inputs, int[] erasedIndexes,
}
if (erasedIndexes.length != outputs.length) {
throw new IllegalArgumentException(
throw new HadoopIllegalArgumentException(
"erasedIndexes and outputs mismatch in length");
}
if (erasedIndexes.length > getNumParityUnits()) {
throw new IllegalArgumentException(
throw new HadoopIllegalArgumentException(
"Too many erased, not recoverable");
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.io.erasurecode.ECChunk;
import java.nio.ByteBuffer;
@ -32,14 +33,42 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
@Override
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
checkParameters(inputs, outputs);
int dataLen = inputs[0].remaining();
if (dataLen == 0) {
return;
}
ensureLength(inputs, dataLen);
ensureLength(outputs, dataLen);
boolean hasArray = inputs[0].hasArray();
if (hasArray) {
byte[][] newInputs = toArrays(inputs);
byte[][] newOutputs = toArrays(outputs);
doEncode(newInputs, newOutputs);
} else {
boolean usingDirectBuffer = inputs[0].isDirect();
if (usingDirectBuffer) {
doEncode(inputs, outputs);
return;
}
int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
inputOffsets[i] = buffer.position();
newInputs[i] = buffer.array();
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.position();
newOutputs[i] = buffer.array();
}
doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets);
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
buffer.position(buffer.position() + dataLen); // dataLen bytes consumed
}
}
@ -53,16 +82,31 @@ public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
@Override
public void encode(byte[][] inputs, byte[][] outputs) {
checkParameters(inputs, outputs);
int dataLen = inputs[0].length;
if (dataLen == 0) {
return;
}
ensureLength(inputs, dataLen);
ensureLength(outputs, dataLen);
doEncode(inputs, outputs);
int[] inputOffsets = new int[inputs.length]; // ALL ZERO
int[] outputOffsets = new int[outputs.length]; // ALL ZERO
doEncode(inputs, inputOffsets, dataLen, outputs, outputOffsets);
}
/**
* Perform the real encoding work using bytes array
* Perform the real encoding work using bytes array, supporting offsets
* and lengths.
* @param inputs
* @param inputOffsets
* @param dataLen
* @param outputs
* @param outputOffsets
*/
protected abstract void doEncode(byte[][] inputs, byte[][] outputs);
protected abstract void doEncode(byte[][] inputs, int[] inputOffsets,
int dataLen, byte[][] outputs,
int[] outputOffsets);
@Override
public void encode(ECChunk[] inputs, ECChunk[] outputs) {
@ -78,10 +122,10 @@ public void encode(ECChunk[] inputs, ECChunk[] outputs) {
*/
protected void checkParameters(Object[] inputs, Object[] outputs) {
if (inputs.length != getNumDataUnits()) {
throw new IllegalArgumentException("Invalid inputs length");
throw new HadoopIllegalArgumentException("Invalid inputs length");
}
if (outputs.length != getNumParityUnits()) {
throw new IllegalArgumentException("Invalid outputs length");
throw new HadoopIllegalArgumentException("Invalid outputs length");
}
}
}

View File

@ -36,9 +36,9 @@ public void initialize(int numDataUnits, int numParityUnits, int chunkSize) {
super.initialize(numDataUnits, numParityUnits, chunkSize);
assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize());
this.errSignature = new int[getNumParityUnits()];
this.primitivePower = RSUtil.getPrimitivePower(getNumDataUnits(),
getNumParityUnits());
this.errSignature = new int[numParityUnits];
this.primitivePower = RSUtil.getPrimitivePower(numDataUnits,
numParityUnits);
}
@Override
@ -49,21 +49,21 @@ protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
RSUtil.GF.substitute(inputs, outputs[i], primitivePower[i]);
}
int dataLen = inputs[0].remaining();
RSUtil.GF.solveVandermondeSystem(errSignature, outputs,
erasedIndexes.length, dataLen);
RSUtil.GF.solveVandermondeSystem(errSignature,
outputs, erasedIndexes.length);
}
@Override
protected void doDecode(byte[][] inputs, int[] erasedIndexes,
byte[][] outputs) {
protected void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets) {
for (int i = 0; i < erasedIndexes.length; i++) {
errSignature[i] = primitivePower[erasedIndexes[i]];
RSUtil.GF.substitute(inputs, outputs[i], primitivePower[i]);
RSUtil.GF.substitute(inputs, inputOffsets, dataLen, outputs[i],
outputOffsets[i], primitivePower[i]);
}
int dataLen = inputs[0].length;
RSUtil.GF.solveVandermondeSystem(errSignature, outputs,
RSUtil.GF.solveVandermondeSystem(errSignature, outputs, outputOffsets,
erasedIndexes.length, dataLen);
}
}

View File

@ -34,12 +34,12 @@ public void initialize(int numDataUnits, int numParityUnits, int chunkSize) {
super.initialize(numDataUnits, numParityUnits, chunkSize);
assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize());
int[] primitivePower = RSUtil.getPrimitivePower(getNumDataUnits(),
getNumParityUnits());
int[] primitivePower = RSUtil.getPrimitivePower(numDataUnits,
numParityUnits);
// compute generating polynomial
int[] gen = {1};
int[] poly = new int[2];
for (int i = 0; i < getNumParityUnits(); i++) {
for (int i = 0; i < numParityUnits; i++) {
poly[0] = primitivePower[i];
poly[1] = 1;
gen = RSUtil.GF.multiply(gen, poly);
@ -50,29 +50,30 @@ public void initialize(int numDataUnits, int numParityUnits, int chunkSize) {
@Override
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
ByteBuffer[] data = new ByteBuffer[getNumDataUnits() + getNumParityUnits()];
for (int i = 0; i < getNumParityUnits(); i++) {
data[i] = outputs[i];
}
for (int i = 0; i < getNumDataUnits(); i++) {
data[i + getNumParityUnits()] = inputs[i];
}
// parity units + data units
ByteBuffer[] all = new ByteBuffer[outputs.length + inputs.length];
System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(inputs, 0, all, outputs.length, inputs.length);
// Compute the remainder
RSUtil.GF.remainder(data, generatingPolynomial);
RSUtil.GF.remainder(all, generatingPolynomial);
}
@Override
protected void doEncode(byte[][] inputs, byte[][] outputs) {
byte[][] data = new byte[getNumDataUnits() + getNumParityUnits()][];
for (int i = 0; i < getNumParityUnits(); i++) {
data[i] = outputs[i];
}
for (int i = 0; i < getNumDataUnits(); i++) {
data[i + getNumParityUnits()] = inputs[i];
}
protected void doEncode(byte[][] inputs, int[] inputOffsets,
int dataLen, byte[][] outputs,
int[] outputOffsets) {
// parity units + data units
byte[][] all = new byte[outputs.length + inputs.length][];
System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(inputs, 0, all, outputs.length, inputs.length);
int[] offsets = new int[inputOffsets.length + outputOffsets.length];
System.arraycopy(outputOffsets, 0, offsets, 0, outputOffsets.length);
System.arraycopy(inputOffsets, 0, offsets,
outputOffsets.length, inputOffsets.length);
// Compute the remainder
RSUtil.GF.remainder(data, generatingPolynomial);
RSUtil.GF.remainder(all, offsets, dataLen, generatingPolynomial);
}
}

View File

@ -21,47 +21,57 @@
/**
* A raw decoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
*
* XOR code is an important primitive code scheme in erasure coding and often
* used in advanced codes, like HitchHiker and LRC, though itself is rarely
* deployed independently.
*/
public class XORRawDecoder extends AbstractRawErasureDecoder {
@Override
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
resetBuffer(outputs[0]);
ByteBuffer output = outputs[0];
resetOutputBuffer(output);
int bufSize = getChunkSize();
int erasedIdx = erasedIndexes[0];
// Process the inputs.
int iIdx, oIdx;
for (int i = 0; i < inputs.length; i++) {
// Skip the erased location.
if (i == erasedIdx) {
continue;
}
for (int j = 0; j < bufSize; j++) {
outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j)));
for (iIdx = inputs[i].position(), oIdx = output.position();
iIdx < inputs[i].limit();
iIdx++, oIdx++) {
output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx)));
}
}
}
@Override
protected void doDecode(byte[][] inputs,
int[] erasedIndexes, byte[][] outputs) {
resetBuffer(outputs[0]);
protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen,
int[] erasedIndexes, byte[][] outputs,
int[] outputOffsets) {
byte[] output = outputs[0];
resetBuffer(output, outputOffsets[0], dataLen);
int bufSize = getChunkSize();
int erasedIdx = erasedIndexes[0];
// Process the inputs.
int iIdx, oIdx;
for (int i = 0; i < inputs.length; i++) {
// Skip the erased location.
if (i == erasedIdx) {
continue;
}
for (int j = 0; j < bufSize; j++) {
outputs[0][j] ^= inputs[i][j];
for (iIdx = inputOffsets[i], oIdx = outputOffsets[0];
iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) {
output[oIdx] ^= inputs[i][iIdx];
}
}
}

View File

@ -21,43 +21,53 @@
/**
* A raw encoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
*
* XOR code is an important primitive code scheme in erasure coding and often
* used in advanced codes, like HitchHiker and LRC, though itself is rarely
* deployed independently.
*/
public class XORRawEncoder extends AbstractRawErasureEncoder {
@Override
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
resetBuffer(outputs[0]);
ByteBuffer output = outputs[0];
resetOutputBuffer(output);
int bufSize = getChunkSize();
// Get the first buffer's data.
for (int j = 0; j < bufSize; j++) {
outputs[0].put(j, inputs[0].get(j));
int iIdx, oIdx;
for (iIdx = inputs[0].position(), oIdx = output.position();
iIdx < inputs[0].limit(); iIdx++, oIdx++) {
output.put(oIdx, inputs[0].get(iIdx));
}
// XOR with everything else.
for (int i = 1; i < inputs.length; i++) {
for (int j = 0; j < bufSize; j++) {
outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j)));
for (iIdx = inputs[i].position(), oIdx = output.position();
iIdx < inputs[i].limit();
iIdx++, oIdx++) {
output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx)));
}
}
}
@Override
protected void doEncode(byte[][] inputs, byte[][] outputs) {
resetBuffer(outputs[0]);
protected void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen,
byte[][] outputs, int[] outputOffsets) {
byte[] output = outputs[0];
resetBuffer(output, outputOffsets[0], dataLen);
int bufSize = getChunkSize();
// Get the first buffer's data.
for (int j = 0; j < bufSize; j++) {
outputs[0][j] = inputs[0][j];
int iIdx, oIdx;
for (iIdx = inputOffsets[0], oIdx = outputOffsets[0];
iIdx < inputOffsets[0] + dataLen; iIdx++, oIdx++) {
output[oIdx] = inputs[0][iIdx];
}
// XOR with everything else.
for (int i = 1; i < inputs.length; i++) {
for (int j = 0; j < bufSize; j++) {
outputs[0][j] ^= inputs[i][j];
for (iIdx = inputOffsets[i], oIdx = outputOffsets[0];
iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) {
output[oIdx] ^= inputs[i][iIdx];
}
}
}
}

View File

@ -235,26 +235,30 @@ public void solveVandermondeSystem(int[] x, int[] y, int len) {
/**
* A "bulk" version to the solving of Vandermonde System
*/
public void solveVandermondeSystem(int[] x, byte[][] y,
public void solveVandermondeSystem(int[] x, byte[][] y, int[] outputOffsets,
int len, int dataLen) {
int idx1, idx2;
for (int i = 0; i < len - 1; i++) {
for (int j = len - 1; j > i; j--) {
for (int k = 0; k < dataLen; k++) {
y[j][k] = (byte) (y[j][k] ^ mulTable[x[i]][y[j - 1][k] &
for (idx2 = outputOffsets[j-1], idx1 = outputOffsets[j];
idx1 < outputOffsets[j] + dataLen; idx1++, idx2++) {
y[j][idx1] = (byte) (y[j][idx1] ^ mulTable[x[i]][y[j - 1][idx2] &
0x000000FF]);
}
}
}
for (int i = len - 1; i >= 0; i--) {
for (int j = i + 1; j < len; j++) {
for (int k = 0; k < dataLen; k++) {
y[j][k] = (byte) (divTable[y[j][k] & 0x000000FF][x[j] ^
for (idx1 = outputOffsets[j];
idx1 < outputOffsets[j] + dataLen; idx1++) {
y[j][idx1] = (byte) (divTable[y[j][idx1] & 0x000000FF][x[j] ^
x[j - i - 1]]);
}
}
for (int j = i; j < len - 1; j++) {
for (int k = 0; k < dataLen; k++) {
y[j][k] = (byte) (y[j][k] ^ y[j + 1][k]);
for (idx2 = outputOffsets[j+1], idx1 = outputOffsets[j];
idx1 < outputOffsets[j] + dataLen; idx1++, idx2++) {
y[j][idx1] = (byte) (y[j][idx1] ^ y[j + 1][idx2]);
}
}
}
@ -263,26 +267,34 @@ public void solveVandermondeSystem(int[] x, byte[][] y,
/**
* A "bulk" version of the solveVandermondeSystem, using ByteBuffer.
*/
public void solveVandermondeSystem(int[] x, ByteBuffer[] y,
int len, int dataLen) {
public void solveVandermondeSystem(int[] x, ByteBuffer[] y, int len) {
ByteBuffer p;
int idx1, idx2;
for (int i = 0; i < len - 1; i++) {
for (int j = len - 1; j > i; j--) {
for (int k = 0; k < dataLen; k++) {
y[j].put(k, (byte) (y[j].get(k) ^ mulTable[x[i]][y[j - 1].get(k) &
p = y[j];
for (idx1 = p.position(), idx2 = y[j-1].position();
idx1 < p.limit(); idx1++, idx2++) {
p.put(idx1, (byte) (p.get(idx1) ^ mulTable[x[i]][y[j-1].get(idx2) &
0x000000FF]));
}
}
}
for (int i = len - 1; i >= 0; i--) {
for (int j = i + 1; j < len; j++) {
for (int k = 0; k < dataLen; k++) {
y[j].put(k, (byte) (divTable[y[j].get(k) & 0x000000FF][x[j] ^
x[j - i - 1]]));
p = y[j];
for (idx1 = p.position(); idx1 < p.limit(); idx1++) {
p.put(idx1, (byte) (divTable[p.get(idx1) &
0x000000FF][x[j] ^ x[j - i - 1]]));
}
}
for (int j = i; j < len - 1; j++) {
for (int k = 0; k < dataLen; k++) {
y[j].put(k, (byte) (y[j].get(k) ^ y[j + 1].get(k)));
p = y[j];
for (idx1 = p.position(), idx2 = y[j+1].position();
idx1 < p.limit(); idx1++, idx2++) {
p.put(idx1, (byte) (p.get(idx1) ^ y[j+1].get(idx2)));
}
}
}
@ -393,6 +405,31 @@ public void substitute(byte[][] p, byte[] q, int x) {
}
}
/**
* A "bulk" version of the substitute.
* Tends to be 2X faster than the "int" substitute in a loop.
*
* @param p input polynomial
* @param offsets
* @param len
* @param q store the return result
* @param offset
* @param x input field
*/
public void substitute(byte[][] p, int[] offsets,
int len, byte[] q, int offset, int x) {
int y = 1, iIdx, oIdx;
for (int i = 0; i < p.length; i++) {
byte[] pi = p[i];
for (iIdx = offsets[i], oIdx = offset;
iIdx < offsets[i] + len; iIdx++, oIdx++) {
int pij = pi[iIdx] & 0x000000FF;
q[oIdx] = (byte) (q[oIdx] ^ mulTable[pij][y]);
}
y = mulTable[x][y];
}
}
/**
* A "bulk" version of the substitute, using ByteBuffer.
* Tends to be 2X faster than the "int" substitute in a loop.
@ -402,13 +439,13 @@ public void substitute(byte[][] p, byte[] q, int x) {
* @param x input field
*/
public void substitute(ByteBuffer[] p, ByteBuffer q, int x) {
int y = 1;
int y = 1, iIdx, oIdx;
for (int i = 0; i < p.length; i++) {
ByteBuffer pi = p[i];
int len = pi.remaining();
for (int j = 0; j < len; j++) {
int pij = pi.get(j) & 0x000000FF;
q.put(j, (byte) (q.get(j) ^ mulTable[pij][y]));
for (iIdx = pi.position(), oIdx = q.position();
iIdx < pi.limit(); iIdx++, oIdx++) {
int pij = pi.get(iIdx) & 0x000000FF;
q.put(oIdx, (byte) (q.get(oIdx) ^ mulTable[pij][y]));
}
y = mulTable[x][y];
}
@ -431,18 +468,43 @@ public void remainder(byte[][] dividend, int[] divisor) {
}
}
/**
* The "bulk" version of the remainder.
* Warning: This function will modify the "dividend" inputs.
*/
public void remainder(byte[][] dividend, int[] offsets,
int len, int[] divisor) {
int idx1, idx2;
for (int i = dividend.length - divisor.length; i >= 0; i--) {
for (int j = 0; j < divisor.length; j++) {
for (idx2 = offsets[j + i], idx1 = offsets[i + divisor.length - 1];
idx1 < offsets[i + divisor.length - 1] + len;
idx1++, idx2++) {
int ratio = divTable[dividend[i + divisor.length - 1][idx1] &
0x00FF][divisor[divisor.length - 1]];
dividend[j + i][idx2] = (byte) ((dividend[j + i][idx2] & 0x00FF) ^
mulTable[ratio][divisor[j]]);
}
}
}
}
/**
* The "bulk" version of the remainder, using ByteBuffer.
* Warning: This function will modify the "dividend" inputs.
*/
public void remainder(ByteBuffer[] dividend, int[] divisor) {
int idx1, idx2;
ByteBuffer b1, b2;
for (int i = dividend.length - divisor.length; i >= 0; i--) {
int width = dividend[i].remaining();
for (int j = 0; j < divisor.length; j++) {
for (int k = 0; k < width; k++) {
int ratio = divTable[dividend[i + divisor.length - 1].get(k) &
b1 = dividend[i + divisor.length - 1];
b2 = dividend[j + i];
for (idx1 = b1.position(), idx2 = b2.position();
idx1 < b1.limit(); idx1++, idx2++) {
int ratio = divTable[b1.get(idx1) &
0x00FF][divisor[divisor.length - 1]];
dividend[j + i].put(k, (byte) ((dividend[j + i].get(k) & 0x00FF) ^
b2.put(idx2, (byte) ((b2.get(idx2) & 0x00FF) ^
mulTable[ratio][divisor[j]]));
}
}

View File

@ -35,7 +35,12 @@ public abstract class TestCoderBase {
private Configuration conf;
protected int numDataUnits;
protected int numParityUnits;
protected int chunkSize = 16 * 1024;
protected int baseChunkSize = 16 * 1024;
private int chunkSize = baseChunkSize;
private byte[] zeroChunkBytes;
private boolean startBufferWithZero = true;
// Indexes of erased data units.
protected int[] erasedDataIndexes = new int[] {0};
@ -47,6 +52,15 @@ public abstract class TestCoderBase {
// may go to different coding implementations.
protected boolean usingDirectBuffer = true;
protected int getChunkSize() {
return chunkSize;
}
protected void setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
this.zeroChunkBytes = new byte[chunkSize]; // With ZERO by default
}
/**
* Prepare before running the case.
* @param numDataUnits
@ -80,8 +94,8 @@ protected Configuration getConf() {
*/
protected void compareAndVerify(ECChunk[] erasedChunks,
ECChunk[] recoveredChunks) {
byte[][] erased = ECChunk.toArrays(erasedChunks);
byte[][] recovered = ECChunk.toArrays(recoveredChunks);
byte[][] erased = toArrays(erasedChunks);
byte[][] recovered = toArrays(recoveredChunks);
boolean result = Arrays.deepEquals(erased, recovered);
assertTrue("Decoding and comparing failed.", result);
}
@ -171,16 +185,19 @@ protected void eraseDataFromChunks(ECChunk[] chunks) {
/**
* Erase data from the specified chunk, putting ZERO bytes to the buffer.
* @param chunk
* @param chunk with a buffer ready to read at the current position
*/
protected void eraseDataFromChunk(ECChunk chunk) {
ByteBuffer chunkBuffer = chunk.getBuffer();
// erase the data
chunkBuffer.position(0);
for (int i = 0; i < chunkSize; i++) {
chunkBuffer.put((byte) 0);
}
// Erase the data at the position, and restore the buffer ready for reading
// same many bytes but all ZERO.
int pos = chunkBuffer.position();
int len = chunkBuffer.remaining();
chunkBuffer.put(zeroChunkBytes, 0, len);
// Back to readable again after data erased
chunkBuffer.flip();
chunkBuffer.position(pos);
chunkBuffer.limit(pos + len);
}
/**
@ -190,7 +207,7 @@ protected void eraseDataFromChunk(ECChunk chunk) {
* @param chunks
* @return
*/
protected static ECChunk[] cloneChunksWithData(ECChunk[] chunks) {
protected ECChunk[] cloneChunksWithData(ECChunk[] chunks) {
ECChunk[] results = new ECChunk[chunks.length];
for (int i = 0; i < chunks.length; i++) {
results[i] = cloneChunkWithData(chunks[i]);
@ -206,22 +223,19 @@ protected static ECChunk[] cloneChunksWithData(ECChunk[] chunks) {
* @param chunk
* @return a new chunk
*/
protected static ECChunk cloneChunkWithData(ECChunk chunk) {
protected ECChunk cloneChunkWithData(ECChunk chunk) {
ByteBuffer srcBuffer = chunk.getBuffer();
ByteBuffer destBuffer;
byte[] bytesArr = new byte[srcBuffer.remaining()];
srcBuffer.mark();
srcBuffer.get(bytesArr);
srcBuffer.get(bytesArr, 0, bytesArr.length);
srcBuffer.reset();
if (srcBuffer.hasArray()) {
destBuffer = ByteBuffer.wrap(bytesArr);
} else {
destBuffer = ByteBuffer.allocateDirect(srcBuffer.remaining());
destBuffer.put(bytesArr);
destBuffer.flip();
}
ByteBuffer destBuffer = allocateOutputBuffer(bytesArr.length);
int pos = destBuffer.position();
destBuffer.put(bytesArr);
destBuffer.flip();
destBuffer.position(pos);
return new ECChunk(destBuffer);
}
@ -231,18 +245,30 @@ protected static ECChunk cloneChunkWithData(ECChunk chunk) {
* @return
*/
protected ECChunk allocateOutputChunk() {
ByteBuffer buffer = allocateOutputBuffer();
ByteBuffer buffer = allocateOutputBuffer(chunkSize);
return new ECChunk(buffer);
}
/**
* Allocate a buffer for output or writing.
* @return
* Allocate a buffer for output or writing. It can prepare for two kinds of
* data buffers: one with position as 0, the other with position > 0
* @return a buffer ready to write chunkSize bytes from current position
*/
protected ByteBuffer allocateOutputBuffer() {
protected ByteBuffer allocateOutputBuffer(int bufferLen) {
/**
* When startBufferWithZero, will prepare a buffer as:---------------
* otherwise, the buffer will be like: ___TO--BE--WRITTEN___,
* and in the beginning, dummy data are prefixed, to simulate a buffer of
* position > 0.
*/
int startOffset = startBufferWithZero ? 0 : 11; // 11 is arbitrary
int allocLen = startOffset + bufferLen + startOffset;
ByteBuffer buffer = usingDirectBuffer ?
ByteBuffer.allocateDirect(chunkSize) : ByteBuffer.allocate(chunkSize);
ByteBuffer.allocateDirect(allocLen) : ByteBuffer.allocate(allocLen);
buffer.limit(startOffset + bufferLen);
fillDummyData(buffer, startOffset);
startBufferWithZero = ! startBufferWithZero;
return buffer;
}
@ -265,15 +291,34 @@ protected ECChunk[] prepareDataChunksForEncoding() {
* @return
*/
protected ECChunk generateDataChunk() {
ByteBuffer buffer = allocateOutputBuffer();
for (int i = 0; i < chunkSize; i++) {
buffer.put((byte) RAND.nextInt(256));
}
ByteBuffer buffer = allocateOutputBuffer(chunkSize);
int pos = buffer.position();
buffer.put(generateData(chunkSize));
buffer.flip();
buffer.position(pos);
return new ECChunk(buffer);
}
/**
* Fill len of dummy data in the buffer at the current position.
* @param buffer
* @param len
*/
protected void fillDummyData(ByteBuffer buffer, int len) {
byte[] dummy = new byte[len];
RAND.nextBytes(dummy);
buffer.put(dummy);
}
protected byte[] generateData(int len) {
byte[] buffer = new byte[len];
for (int i = 0; i < buffer.length; i++) {
buffer[i] = (byte) RAND.nextInt(256);
}
return buffer;
}
/**
* Prepare parity chunks for encoding, each chunk for each parity unit.
* @return
@ -303,4 +348,32 @@ protected ECChunk[] prepareOutputChunksForDecoding() {
return chunks;
}
/**
* Convert an array of this chunks to an array of byte array.
* Note the chunk buffers are not affected.
* @param chunks
* @return an array of byte array
*/
protected byte[][] toArrays(ECChunk[] chunks) {
byte[][] bytesArr = new byte[chunks.length][];
for (int i = 0; i < chunks.length; i++) {
bytesArr[i] = chunks[i].toBytesArray();
}
return bytesArr;
}
/**
* Make some chunk messy or not correct any more
* @param chunks
*/
protected void corruptSomeChunk(ECChunk[] chunks) {
int idx = new Random().nextInt(chunks.length);
ByteBuffer buffer = chunks[idx].getBuffer();
if (buffer.hasRemaining()) {
buffer.position(buffer.position() + 1);
}
}
}

View File

@ -59,6 +59,19 @@ protected void testCoding(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
/**
* The following runs will use 3 different chunkSize for inputs and outputs,
* to verify the same encoder/decoder can process variable width of data.
*/
performTestCoding(baseChunkSize);
performTestCoding(baseChunkSize - 17);
performTestCoding(baseChunkSize + 16);
}
private void performTestCoding(int chunkSize) {
setChunkSize(chunkSize);
// Generate data and encode
ECBlockGroup blockGroup = prepareBlockGroupForEncoding();
// Backup all the source chunks for later recovering because some coders
@ -138,7 +151,7 @@ private ErasureCoder createEncoder() {
throw new RuntimeException("Failed to create encoder", e);
}
encoder.initialize(numDataUnits, numParityUnits, chunkSize);
encoder.initialize(numDataUnits, numParityUnits, getChunkSize());
encoder.setConf(getConf());
return encoder;
}
@ -165,7 +178,7 @@ private ErasureCoder createDecoder() {
throw new RuntimeException("Failed to create decoder", e);
}
decoder.initialize(numDataUnits, numParityUnits, chunkSize);
decoder.initialize(numDataUnits, numParityUnits, getChunkSize());
decoder.setConf(getConf());
return decoder;
}
@ -249,7 +262,7 @@ protected TestBlock allocateOutputBlock() {
* @param blocks
* @return
*/
protected static TestBlock[] cloneBlocksWithData(TestBlock[] blocks) {
protected TestBlock[] cloneBlocksWithData(TestBlock[] blocks) {
TestBlock[] results = new TestBlock[blocks.length];
for (int i = 0; i < blocks.length; ++i) {
results[i] = cloneBlockWithData(blocks[i]);
@ -263,7 +276,7 @@ protected static TestBlock[] cloneBlocksWithData(TestBlock[] blocks) {
* @param block
* @return a new block
*/
protected static TestBlock cloneBlockWithData(TestBlock block) {
protected TestBlock cloneBlockWithData(TestBlock block) {
ECChunk[] newChunks = cloneChunksWithData(block.chunks);
return new TestBlock(newChunks);

View File

@ -44,7 +44,7 @@ public void testCodingNoDirectBuffer_10x4_erasing_d0_p0() {
@Test
public void testCodingDirectBuffer_10x4_erasing_p1() {
prepare(null, 10, 4, new int[] {}, new int[] {1});
prepare(null, 10, 4, new int[0], new int[] {1});
testCoding(true);
testCoding(true);
}
@ -101,4 +101,14 @@ public void testCodingDirectBuffer_3x3_erasing_d0_p0() {
prepare(null, 3, 3, new int[] {0}, new int[] {0});
testCoding(true);
}
@Test
public void testCodingNegative_10x4_erasing_d2_d4() {
prepare(null, 10, 4, new int[]{2, 4}, new int[0]);
testCodingWithBadInput(true);
testCodingWithBadOutput(false);
testCodingWithBadInput(true);
testCodingWithBadOutput(false);
}
}

View File

@ -39,13 +39,11 @@ public abstract class TestRSRawCoderBase extends TestRawCoderBase {
}
@Override
protected ECChunk generateDataChunk() {
ByteBuffer buffer = allocateOutputBuffer();
for (int i = 0; i < chunkSize; i++) {
buffer.put((byte) RAND.nextInt(symbolMax));
protected byte[] generateData(int len) {
byte[] buffer = new byte[len];
for (int i = 0; i < len; i++) {
buffer[i] = (byte) RAND.nextInt(symbolMax);
}
buffer.flip();
return new ECChunk(buffer);
return buffer;
}
}

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.TestCoderBase;
import org.junit.Assert;
/**
* Raw coder test base with utilities.
@ -41,8 +42,57 @@ protected void testCoding(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
/**
* The following runs will use 3 different chunkSize for inputs and outputs,
* to verify the same encoder/decoder can process variable width of data.
*/
performTestCoding(baseChunkSize, false, false);
performTestCoding(baseChunkSize - 17, false, false);
performTestCoding(baseChunkSize + 16, false, false);
}
/**
* Similar to above, but perform negative cases using bad input for encoding.
* @param usingDirectBuffer
*/
protected void testCodingWithBadInput(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
try {
performTestCoding(baseChunkSize, true, false);
Assert.fail("Encoding test with bad input should fail");
} catch (Exception e) {
// Expected
}
}
/**
* Similar to above, but perform negative cases using bad output for decoding.
* @param usingDirectBuffer
*/
protected void testCodingWithBadOutput(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
try {
performTestCoding(baseChunkSize, false, true);
Assert.fail("Decoding test with bad output should fail");
} catch (Exception e) {
// Expected
}
}
private void performTestCoding(int chunkSize,
boolean useBadInput, boolean useBadOutput) {
setChunkSize(chunkSize);
// Generate data and encode
ECChunk[] dataChunks = prepareDataChunksForEncoding();
if (useBadInput) {
corruptSomeChunk(dataChunks);
}
ECChunk[] parityChunks = prepareParityChunksForEncoding();
// Backup all the source chunks for later recovering because some coders
@ -59,6 +109,9 @@ protected void testCoding(boolean usingDirectBuffer) {
clonedDataChunks, parityChunks);
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
if (useBadOutput) {
corruptSomeChunk(recoveredChunks);
}
decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
@ -88,7 +141,7 @@ protected RawErasureEncoder createEncoder() {
throw new RuntimeException("Failed to create encoder", e);
}
encoder.initialize(numDataUnits, numParityUnits, chunkSize);
encoder.initialize(numDataUnits, numParityUnits, getChunkSize());
encoder.setConf(getConf());
return encoder;
}
@ -105,7 +158,7 @@ protected RawErasureDecoder createDecoder() {
throw new RuntimeException("Failed to create decoder", e);
}
decoder.initialize(numDataUnits, numParityUnits, chunkSize);
decoder.initialize(numDataUnits, numParityUnits, getChunkSize());
decoder.setConf(getConf());
return decoder;
}

View File

@ -49,6 +49,15 @@ public void testCodingNoDirectBuffer_erasing_d0() {
@Test
public void testCodingDirectBuffer_erasing_p0() {
prepare(null, 10, 1, new int[0], new int[] {0});
testCoding(true);
testCoding(true);
}
@Test
public void testCodingDirectBuffer_erasing_d0() {
prepare(null, 10, 1, new int[] {0}, new int[0]);
testCoding(true);
testCoding(true);
}
@ -67,4 +76,14 @@ public void testCodingBothBuffers_erasing_d5() {
testCoding(true);
testCoding(false);
}
@Test
public void testCodingNegative_erasing_d5() {
prepare(null, 10, 1, new int[]{5}, new int[0]);
testCodingWithBadInput(true);
testCodingWithBadOutput(false);
testCodingWithBadInput(true);
testCodingWithBadOutput(false);
}
}