HDFS-12613. Native EC coder should implement release() as idempotent function. (Lei (Eddy) Xu)

This commit is contained in:
Lei Xu 2017-10-16 19:44:30 -07:00
parent b406d8e375
commit 31ebccc962
34 changed files with 269 additions and 76 deletions

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECChunk;
import java.io.IOException;
/**
* Erasure coding step that's involved in encoding/decoding of a block group.
*/
@ -47,7 +49,8 @@ public interface ErasureCodingStep {
* @param inputChunks
* @param outputChunks
*/
void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks);
void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks)
throws IOException;
/**
* Notify erasure coder that all the chunks of input blocks are processed so

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import java.io.IOException;
/**
* Erasure decoding step, a wrapper of all the necessary information to perform
* a decoding step involved in the whole process of decoding a block group.
@ -50,7 +52,8 @@ public ErasureDecodingStep(ECBlock[] inputBlocks, int[] erasedIndexes,
}
@Override
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks) {
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks)
throws IOException {
rawDecoder.decode(inputChunks, erasedIndexes, outputChunks);
}

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import java.io.IOException;
/**
* Erasure encoding step, a wrapper of all the necessary information to perform
* an encoding step involved in the whole process of encoding a block group.
@ -46,7 +48,8 @@ public ErasureEncodingStep(ECBlock[] inputBlocks, ECBlock[] outputBlocks,
}
@Override
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks) {
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks)
throws IOException {
rawEncoder.encode(inputChunks, outputChunks);
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.erasurecode.coder;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
@ -64,7 +65,8 @@ public HHXORErasureDecodingStep(ECBlock[] inputBlocks, int[] erasedIndexes,
}
@Override
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks) {
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks)
throws IOException {
if (erasedIndexes.length == 0) {
return;
}
@ -74,7 +76,8 @@ public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks) {
performCoding(inputBuffers, outputBuffers);
}
private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs) {
private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs)
throws IOException {
final int numDataUnits = rsRawDecoder.getNumDataUnits();
final int numParityUnits = rsRawDecoder.getNumParityUnits();
final int numTotalUnits = numDataUnits + numParityUnits;
@ -119,7 +122,7 @@ private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs) {
private void doDecodeSingle(ByteBuffer[][] inputs, ByteBuffer[][] outputs,
int erasedLocationToFix, int bufSize,
boolean isDirect) {
boolean isDirect) throws IOException {
final int numDataUnits = rsRawDecoder.getNumDataUnits();
final int numParityUnits = rsRawDecoder.getNumParityUnits();
final int subPacketSize = getSubPacketSize();
@ -261,7 +264,8 @@ private void doDecodeByPiggyBack(byte[][] inputs, int[] inputOffsets,
private void doDecodeMultiAndParity(ByteBuffer[][] inputs,
ByteBuffer[][] outputs,
int[] erasedLocationToFix, int bufSize) {
int[] erasedLocationToFix, int bufSize)
throws IOException {
final int numDataUnits = rsRawDecoder.getNumDataUnits();
final int numParityUnits = rsRawDecoder.getNumParityUnits();
final int numTotalUnits = numDataUnits + numParityUnits;

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.erasurecode.coder;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
@ -56,13 +57,15 @@ public HHXORErasureEncodingStep(ECBlock[] inputBlocks, ECBlock[] outputBlocks,
}
@Override
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks) {
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks)
throws IOException {
ByteBuffer[] inputBuffers = ECChunk.toBuffers(inputChunks);
ByteBuffer[] outputBuffers = ECChunk.toBuffers(outputChunks);
performCoding(inputBuffers, outputBuffers);
}
private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs) {
private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs)
throws IOException {
final int numDataUnits = this.rsRawEncoder.getNumDataUnits();
final int numParityUnits = this.rsRawEncoder.getNumParityUnits();
final int subSPacketSize = getSubPacketSize();
@ -95,7 +98,8 @@ private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs) {
doEncode(hhInputs, hhOutputs);
}
private void doEncode(ByteBuffer[][] inputs, ByteBuffer[][] outputs) {
private void doEncode(ByteBuffer[][] inputs, ByteBuffer[][] outputs)
throws IOException {
final int numParityUnits = this.rsRawEncoder.getNumParityUnits();
// calc piggyBacks using first sub-packet

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.erasurecode.coder.util;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
@ -64,7 +65,8 @@ public static ByteBuffer[] getPiggyBacksFromInput(ByteBuffer[] inputs,
int[] piggyBackIndex,
int numParityUnits,
int pgIndex,
RawErasureEncoder encoder) {
RawErasureEncoder encoder)
throws IOException {
ByteBuffer[] emptyInput = new ByteBuffer[inputs.length];
ByteBuffer[] tempInput = new ByteBuffer[inputs.length];
int[] inputPositions = new int[inputs.length];

View File

@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -38,7 +39,12 @@ public AbstractNativeRawDecoder(ErasureCoderOptions coderOptions) {
}
@Override
protected void doDecode(ByteBufferDecodingState decodingState) {
protected synchronized void doDecode(ByteBufferDecodingState decodingState)
throws IOException {
if (nativeCoder == 0) {
throw new IOException(String.format("%s closed",
getClass().getSimpleName()));
}
int[] inputOffsets = new int[decodingState.inputs.length];
int[] outputOffsets = new int[decodingState.outputs.length];
@ -63,10 +69,12 @@ protected void doDecode(ByteBufferDecodingState decodingState) {
protected abstract void performDecodeImpl(ByteBuffer[] inputs,
int[] inputOffsets, int dataLen,
int[] erased, ByteBuffer[] outputs,
int[] outputOffsets);
int[] outputOffsets)
throws IOException;
@Override
protected void doDecode(ByteArrayDecodingState decodingState) {
protected void doDecode(ByteArrayDecodingState decodingState)
throws IOException {
PerformanceAdvisory.LOG.debug("convertToByteBufferState is invoked, " +
"not efficiently. Please use direct ByteBuffer inputs/outputs");

View File

@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -38,7 +39,12 @@ public AbstractNativeRawEncoder(ErasureCoderOptions coderOptions) {
}
@Override
protected void doEncode(ByteBufferEncodingState encodingState) {
protected synchronized void doEncode(ByteBufferEncodingState encodingState)
throws IOException {
if (nativeCoder == 0) {
throw new IOException(String.format("%s closed",
getClass().getSimpleName()));
}
int[] inputOffsets = new int[encodingState.inputs.length];
int[] outputOffsets = new int[encodingState.outputs.length];
int dataLen = encodingState.inputs[0].remaining();
@ -60,10 +66,12 @@ protected void doEncode(ByteBufferEncodingState encodingState) {
protected abstract void performEncodeImpl(
ByteBuffer[] inputs, int[] inputOffsets,
int dataLen, ByteBuffer[] outputs, int[] outputOffsets);
int dataLen, ByteBuffer[] outputs, int[] outputOffsets)
throws IOException;
@Override
protected void doEncode(ByteArrayEncodingState encodingState) {
protected void doEncode(ByteArrayEncodingState encodingState)
throws IOException {
PerformanceAdvisory.LOG.debug("convertToByteBufferState is invoked, " +
"not efficiently. Please use direct ByteBuffer inputs/outputs");

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -39,14 +40,14 @@ public NativeRSRawDecoder(ErasureCoderOptions coderOptions) {
}
@Override
protected void performDecodeImpl(ByteBuffer[] inputs, int[] inputOffsets,
int dataLen, int[] erased,
ByteBuffer[] outputs, int[] outputOffsets) {
protected synchronized void performDecodeImpl(
ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased,
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
decodeImpl(inputs, inputOffsets, dataLen, erased, outputs, outputOffsets);
}
@Override
public void release() {
public synchronized void release() {
destroyImpl();
}
@ -59,7 +60,7 @@ public boolean preferDirectBuffer() {
private native void decodeImpl(
ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased,
ByteBuffer[] outputs, int[] outputOffsets);
ByteBuffer[] outputs, int[] outputOffsets) throws IOException;
private native void destroyImpl();

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -39,14 +40,14 @@ public NativeRSRawEncoder(ErasureCoderOptions coderOptions) {
}
@Override
protected void performEncodeImpl(
protected synchronized void performEncodeImpl(
ByteBuffer[] inputs, int[] inputOffsets, int dataLen,
ByteBuffer[] outputs, int[] outputOffsets) {
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
encodeImpl(inputs, inputOffsets, dataLen, outputs, outputOffsets);
}
@Override
public void release() {
public synchronized void release() {
destroyImpl();
}
@ -58,8 +59,8 @@ public boolean preferDirectBuffer() {
private native void initImpl(int numDataUnits, int numParityUnits);
private native void encodeImpl(ByteBuffer[] inputs, int[] inputOffsets,
int dataLen, ByteBuffer[] outputs,
int[] outputOffsets);
int dataLen, ByteBuffer[] outputs,
int[] outputOffsets) throws IOException;
private native void destroyImpl();
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -39,21 +40,26 @@ public NativeXORRawDecoder(ErasureCoderOptions coderOptions) {
}
@Override
protected void performDecodeImpl(ByteBuffer[] inputs, int[] inputOffsets,
int dataLen, int[] erased, ByteBuffer[] outputs, int[] outputOffsets) {
protected synchronized void performDecodeImpl(
ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased,
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
decodeImpl(inputs, inputOffsets, dataLen, erased, outputs, outputOffsets);
}
@Override
public void release() {
public synchronized void release() {
destroyImpl();
}
private native void initImpl(int numDataUnits, int numParityUnits);
/**
* Native implementation of decoding.
* @throws IOException if the decoder is closed.
*/
private native void decodeImpl(
ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased,
ByteBuffer[] outputs, int[] outputOffsets);
ByteBuffer[] outputs, int[] outputOffsets) throws IOException;
private native void destroyImpl();
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -39,14 +40,14 @@ public NativeXORRawEncoder(ErasureCoderOptions coderOptions) {
}
@Override
protected void performEncodeImpl(
protected synchronized void performEncodeImpl(
ByteBuffer[] inputs, int[] inputOffsets, int dataLen,
ByteBuffer[] outputs, int[] outputOffsets) {
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
encodeImpl(inputs, inputOffsets, dataLen, outputs, outputOffsets);
}
@Override
public void release() {
public synchronized void release() {
destroyImpl();
}
@ -54,7 +55,7 @@ public void release() {
private native void encodeImpl(ByteBuffer[] inputs, int[] inputOffsets,
int dataLen, ByteBuffer[] outputs,
int[] outputOffsets);
int[] outputOffsets) throws IOException;
private native void destroyImpl();
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -53,7 +54,7 @@ public RSLegacyRawDecoder(ErasureCoderOptions coderOptions) {
@Override
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer[] outputs) throws IOException {
// Make copies avoiding affecting original ones;
ByteBuffer[] newInputs = new ByteBuffer[inputs.length];
int[] newErasedIndexes = new int[erasedIndexes.length];
@ -67,7 +68,8 @@ public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
}
@Override
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs)
throws IOException {
// Make copies avoiding affecting original ones;
byte[][] newInputs = new byte[inputs.length][];
int[] newErasedIndexes = new int[erasedIndexes.length];

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -81,7 +82,7 @@ public RawErasureDecoder(ErasureCoderOptions coderOptions) {
* erasedIndexes, ready for read after the call
*/
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer[] outputs) throws IOException {
ByteBufferDecodingState decodingState = new ByteBufferDecodingState(this,
inputs, erasedIndexes, outputs);
@ -117,7 +118,8 @@ public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
* Perform the real decoding using Direct ByteBuffer.
* @param decodingState the decoding state
*/
protected abstract void doDecode(ByteBufferDecodingState decodingState);
protected abstract void doDecode(ByteBufferDecodingState decodingState)
throws IOException;
/**
* Decode with inputs and erasedIndexes, generates outputs. More see above.
@ -126,8 +128,10 @@ public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs output buffers to put decoded data into according to
* erasedIndexes, ready for read after the call
* @throws IOException if the decoder is closed.
*/
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs)
throws IOException {
ByteArrayDecodingState decodingState = new ByteArrayDecodingState(this,
inputs, erasedIndexes, outputs);
@ -142,8 +146,10 @@ public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
* Perform the real decoding using bytes array, supporting offsets and
* lengths.
* @param decodingState the decoding state
* @throws IOException if the decoder is closed.
*/
protected abstract void doDecode(ByteArrayDecodingState decodingState);
protected abstract void doDecode(ByteArrayDecodingState decodingState)
throws IOException;
/**
* Decode with inputs and erasedIndexes, generates outputs. More see above.
@ -155,9 +161,10 @@ public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs output buffers to put decoded data into according to
* erasedIndexes, ready for read after the call
* @throws IOException if the decoder is closed
*/
public void decode(ECChunk[] inputs, int[] erasedIndexes,
ECChunk[] outputs) {
ECChunk[] outputs) throws IOException {
ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs);
ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs);
decode(newInputs, erasedIndexes, newOutputs);

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -62,8 +63,10 @@ public RawErasureEncoder(ErasureCoderOptions coderOptions) {
* be 0 after encoding
* @param outputs output buffers to put the encoded data into, ready to read
* after the call
* @throws IOException if the encoder is closed.
*/
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs)
throws IOException {
ByteBufferEncodingState bbeState = new ByteBufferEncodingState(
this, inputs, outputs);
@ -99,7 +102,8 @@ public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
* Perform the real encoding work using direct ByteBuffer.
* @param encodingState the encoding state
*/
protected abstract void doEncode(ByteBufferEncodingState encodingState);
protected abstract void doEncode(ByteBufferEncodingState encodingState)
throws IOException;
/**
* Encode with inputs and generates outputs. More see above.
@ -108,7 +112,7 @@ public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
* @param outputs output buffers to put the encoded data into, read to read
* after the call
*/
public void encode(byte[][] inputs, byte[][] outputs) {
public void encode(byte[][] inputs, byte[][] outputs) throws IOException {
ByteArrayEncodingState baeState = new ByteArrayEncodingState(
this, inputs, outputs);
@ -125,7 +129,8 @@ public void encode(byte[][] inputs, byte[][] outputs) {
* and lengths.
* @param encodingState the encoding state
*/
protected abstract void doEncode(ByteArrayEncodingState encodingState);
protected abstract void doEncode(ByteArrayEncodingState encodingState)
throws IOException;
/**
* Encode with inputs and generates outputs. More see above.
@ -133,8 +138,9 @@ public void encode(byte[][] inputs, byte[][] outputs) {
* @param inputs input buffers to read data from
* @param outputs output buffers to put the encoded data into, read to read
* after the call
* @throws IOException if the encoder is closed.
*/
public void encode(ECChunk[] inputs, ECChunk[] outputs) {
public void encode(ECChunk[] inputs, ECChunk[] outputs) throws IOException {
ByteBuffer[] newInputs = ECChunk.toBuffers(inputs);
ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs);
encode(newInputs, newOutputs);

View File

@ -63,8 +63,9 @@ IsalCoder* getCoder(JNIEnv* env, jobject thiz) {
"Field nativeCoder not found");
}
pCoder = (IsalCoder*)(*env)->GetLongField(env, thiz, fid);
pCoder->verbose = (verbose == JNI_TRUE) ? 1 : 0;
if (pCoder != NULL) {
pCoder->verbose = (verbose == JNI_TRUE) ? 1 : 0;
}
return pCoder;
}

View File

@ -48,6 +48,10 @@ JNIEnv *env, jobject thiz, jobjectArray inputs, jintArray inputOffsets,
jint dataLen, jintArray erasedIndexes, jobjectArray outputs,
jintArray outputOffsets) {
RSDecoder* rsDecoder = (RSDecoder*)getCoder(env, thiz);
if (!rsDecoder) {
THROW(env, "java/io/IOException", "NativeRSRawDecoder closed");
return;
}
int numDataUnits = rsDecoder->decoder.coder.numDataUnits;
int numParityUnits = rsDecoder->decoder.coder.numParityUnits;
@ -68,5 +72,8 @@ JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_erasurecode_rawcoder_NativeRSRawDecoder_destroyImpl(
JNIEnv *env, jobject thiz) {
RSDecoder* rsDecoder = (RSDecoder*)getCoder(env, thiz);
free(rsDecoder);
if (rsDecoder) {
free(rsDecoder);
setCoder(env, thiz, NULL);
}
}

View File

@ -47,6 +47,10 @@ Java_org_apache_hadoop_io_erasurecode_rawcoder_NativeRSRawEncoder_encodeImpl(
JNIEnv *env, jobject thiz, jobjectArray inputs, jintArray inputOffsets,
jint dataLen, jobjectArray outputs, jintArray outputOffsets) {
RSEncoder* rsEncoder = (RSEncoder*)getCoder(env, thiz);
if (!rsEncoder) {
THROW(env, "java/io/IOException", "NativeRSRawEncoder closed");
return;
}
int numDataUnits = rsEncoder->encoder.coder.numDataUnits;
int numParityUnits = rsEncoder->encoder.coder.numParityUnits;
@ -62,5 +66,8 @@ JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_erasurecode_rawcoder_NativeRSRawEncoder_destroyImpl(
JNIEnv *env, jobject thiz) {
RSEncoder* rsEncoder = (RSEncoder*)getCoder(env, thiz);
free(rsEncoder);
if (rsEncoder) {
free(rsEncoder);
setCoder(env, thiz, NULL);
}
}

View File

@ -54,6 +54,10 @@ Java_org_apache_hadoop_io_erasurecode_rawcoder_NativeXORRawDecoder_decodeImpl(
XORDecoder* xorDecoder;
xorDecoder = (XORDecoder*)getCoder(env, thiz);
if (!xorDecoder) {
THROW(env, "java/io/IOException", "NativeXORRawDecoder closed");
return;
}
numDataUnits = ((IsalCoder*)xorDecoder)->numDataUnits;
numParityUnits = ((IsalCoder*)xorDecoder)->numParityUnits;
chunkSize = (int)dataLen;
@ -76,5 +80,8 @@ JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_erasurecode_rawcoder_NativeXORRawDecoder_destroyImpl
(JNIEnv *env, jobject thiz){
XORDecoder* xorDecoder = (XORDecoder*)getCoder(env, thiz);
free(xorDecoder);
if (xorDecoder) {
free(xorDecoder);
setCoder(env, thiz, NULL);
}
}

View File

@ -54,6 +54,10 @@ Java_org_apache_hadoop_io_erasurecode_rawcoder_NativeXORRawEncoder_encodeImpl(
XOREncoder* xorEncoder;
xorEncoder = (XOREncoder*)getCoder(env, thiz);
if (!xorEncoder) {
THROW(env, "java/io/IOException", "NativeXORRawEncoder closed");
return;
}
numDataUnits = ((IsalCoder*)xorEncoder)->numDataUnits;
numParityUnits = ((IsalCoder*)xorEncoder)->numParityUnits;
chunkSize = (int)dataLen;
@ -78,5 +82,8 @@ JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_erasurecode_rawcoder_NativeXORRawEncoder_destroyImpl
(JNIEnv *env, jobject thiz) {
XOREncoder* xorEncoder = (XOREncoder*)getCoder(env, thiz);
free(xorEncoder);
if (xorEncoder) {
free(xorEncoder);
setCoder(env, thiz, NULL);
}
}

View File

@ -23,8 +23,11 @@
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.TestCoderBase;
import java.io.IOException;
import java.lang.reflect.Constructor;
import static org.junit.Assert.fail;
/**
* Erasure coder test base with utilities.
*/
@ -85,14 +88,22 @@ private void performTestCoding(int chunkSize, boolean usingSlicedBuffer) {
ErasureCodingStep codingStep;
codingStep = encoder.calculateCoding(blockGroup);
performCodingStep(codingStep);
try {
performCodingStep(codingStep);
} catch (IOException e) {
fail("Should not expect IOException: " + e.getMessage());
}
// Erase specified sources but return copies of them for later comparing
TestBlock[] backupBlocks = backupAndEraseBlocks(clonedDataBlocks, parityBlocks);
// Decode
blockGroup = new ECBlockGroup(clonedDataBlocks, blockGroup.getParityBlocks());
codingStep = decoder.calculateCoding(blockGroup);
performCodingStep(codingStep);
try {
performCodingStep(codingStep);
} catch (IOException e) {
fail("Should not expect IOException: " + e.getMessage());
}
// Compare
compareAndVerify(backupBlocks, codingStep.getOutputBlocks());
@ -102,7 +113,8 @@ private void performTestCoding(int chunkSize, boolean usingSlicedBuffer) {
* This is typically how a coding step should be performed.
* @param codingStep
*/
protected void performCodingStep(ErasureCodingStep codingStep) {
protected void performCodingStep(ErasureCodingStep codingStep)
throws IOException {
// Pretend that we're opening these input blocks and output blocks.
ECBlock[] inputBlocks = codingStep.getInputBlocks();
ECBlock[] outputBlocks = codingStep.getOutputBlocks();

View File

@ -20,6 +20,10 @@
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECChunk;
import java.io.IOException;
import static org.junit.Assert.fail;
/**
* Erasure coder test base with utilities for hitchhiker.
@ -53,7 +57,11 @@ protected void performCodingStep(ErasureCodingStep codingStep) {
}
// Given the input chunks and output chunk buffers, just call it !
codingStep.performCoding(inputChunks, outputChunks);
try {
codingStep.performCoding(inputChunks, outputChunks);
} catch (IOException e) {
fail("Unexpected IOException: " + e.getMessage());
}
}
codingStep.finish();

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.util.StopWatch;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.ArrayList;
@ -232,7 +233,7 @@ public static void performBench(String opType, CODER coder,
}
}
private static RawErasureEncoder getRawEncoder(int index) {
private static RawErasureEncoder getRawEncoder(int index) throws IOException {
RawErasureEncoder encoder =
CODER_MAKERS.get(index).createEncoder(BenchData.OPTIONS);
final boolean isDirect = encoder.preferDirectBuffer();
@ -242,7 +243,7 @@ private static RawErasureEncoder getRawEncoder(int index) {
return encoder;
}
private static RawErasureDecoder getRawDecoder(int index) {
private static RawErasureDecoder getRawDecoder(int index) throws IOException {
RawErasureDecoder decoder =
CODER_MAKERS.get(index).createDecoder(BenchData.OPTIONS);
final boolean isDirect = decoder.preferDirectBuffer();
@ -341,11 +342,11 @@ public void prepareDecInput() {
System.arraycopy(inputs, 0, decodeInputs, 0, NUM_DATA_UNITS);
}
public void encode(RawErasureEncoder encoder) {
public void encode(RawErasureEncoder encoder) throws IOException {
encoder.encode(inputs, outputs);
}
public void decode(RawErasureDecoder decoder) {
public void decode(RawErasureDecoder decoder) throws IOException {
decoder.decode(decodeInputs, ERASED_INDEXES, outputs);
}
}

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -59,7 +61,11 @@ protected void testCoding(boolean usingDirectBuffer) {
ECChunk[] dataChunks = prepareDataChunksForEncoding();
markChunks(dataChunks);
ECChunk[] parityChunks = prepareParityChunksForEncoding();
encoder.encode(dataChunks, parityChunks);
try {
encoder.encode(dataChunks, parityChunks);
} catch (IOException e) {
Assert.fail("Unexpected IOException: " + e.getMessage());
}
compareAndVerify(parityChunks, getEmptyChunks(parityChunks.length));
// Decode
@ -69,7 +75,12 @@ protected void testCoding(boolean usingDirectBuffer) {
dataChunks, parityChunks);
ensureOnlyLeastRequiredChunks(inputChunks);
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
try {
decoder.decode(inputChunks, getErasedIndexesForDecoding(),
recoveredChunks);
} catch (IOException e) {
Assert.fail("Unexpected IOException: " + e.getMessage());
}
compareAndVerify(recoveredChunks, getEmptyChunks(recoveredChunks.length));
}

View File

@ -118,4 +118,10 @@ public void testCoding_10x4_erasing_d0_p0() {
prepare(null, 10, 4, new int[] {0}, new int[] {0});
testCodingDoMixAndTwice();
}
@Test
public void testAfterRelease63() throws Exception {
prepare(6, 3, null, null);
testAfterRelease();
}
}

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
/**
* Test NativeXOR encoding and decoding.
@ -33,4 +34,10 @@ public void setup() {
this.decoderFactoryClass = NativeXORRawErasureCoderFactory.class;
setAllowDump(true);
}
@Test
public void testAfterRelease63() throws Exception {
prepare(6, 3, null, null);
testAfterRelease();
}
}

View File

@ -20,9 +20,12 @@
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.TestCoderBase;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/**
* Raw coder test base with utilities.
*/
@ -104,6 +107,28 @@ protected void testCodingWithBadOutput(boolean usingDirectBuffer) {
}
}
/**
* Test encode / decode after release(). It should raise IOException.
*
* @throws Exception
*/
void testAfterRelease() throws Exception {
prepareCoders(true);
prepareBufferAllocator(true);
encoder.release();
final ECChunk[] data = prepareDataChunksForEncoding();
final ECChunk[] parity = prepareParityChunksForEncoding();
LambdaTestUtils.intercept(IOException.class, "closed",
() -> encoder.encode(data, parity));
decoder.release();
final ECChunk[] in = prepareInputChunksForDecoding(data, parity);
final ECChunk[] out = prepareOutputChunksForDecoding();
LambdaTestUtils.intercept(IOException.class, "closed",
() -> decoder.decode(in, getErasedIndexesForDecoding(), out));
}
@Test
public void testCodingWithErasingTooMany() {
try {
@ -121,6 +146,16 @@ public void testCodingWithErasingTooMany() {
}
}
@Test
public void testIdempotentReleases() {
prepareCoders(true);
for (int i = 0; i < 3; i++) {
encoder.release();
decoder.release();
}
}
private void performTestCoding(int chunkSize, boolean usingSlicedBuffer,
boolean useBadInput, boolean useBadOutput,
boolean allowChangeInputs) {
@ -144,7 +179,11 @@ private void performTestCoding(int chunkSize, boolean usingSlicedBuffer,
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
markChunks(dataChunks);
encoder.encode(dataChunks, parityChunks);
try {
encoder.encode(dataChunks, parityChunks);
} catch (IOException e) {
Assert.fail("Should not get IOException: " + e.getMessage());
}
dumpChunks("Encoded parity chunks", parityChunks);
if (!allowChangeInputs) {
@ -174,7 +213,12 @@ private void performTestCoding(int chunkSize, boolean usingSlicedBuffer,
}
dumpChunks("Decoding input chunks", inputChunks);
decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
try {
decoder.decode(inputChunks, getErasedIndexesForDecoding(),
recoveredChunks);
} catch (IOException e) {
Assert.fail("Should not get IOException: " + e.getMessage());
}
dumpChunks("Decoded/recovered chunks", recoveredChunks);
if (!allowChangeInputs) {
@ -268,7 +312,11 @@ protected void testInputPosition(boolean usingDirectBuffer) {
ECChunk[] dataChunks = prepareDataChunksForEncoding();
ECChunk[] parityChunks = prepareParityChunksForEncoding();
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
encoder.encode(dataChunks, parityChunks);
try {
encoder.encode(dataChunks, parityChunks);
} catch (IOException e) {
Assert.fail("Should not get IOException: " + e.getMessage());
}
verifyBufferPositionAtEnd(dataChunks);
// verify decode
@ -277,7 +325,12 @@ protected void testInputPosition(boolean usingDirectBuffer) {
clonedDataChunks, parityChunks);
ensureOnlyLeastRequiredChunks(inputChunks);
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
try {
decoder.decode(inputChunks, getErasedIndexesForDecoding(),
recoveredChunks);
} catch (IOException e) {
Assert.fail("Should not get IOException: " + e.getMessage());
}
verifyBufferPositionAtEnd(inputChunks);
}

View File

@ -355,7 +355,7 @@ private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) {
* @param buffers data buffers + parity buffers
*/
private static void encode(RawErasureEncoder encoder, int numData,
ByteBuffer[] buffers) {
ByteBuffer[] buffers) throws IOException {
final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -68,7 +69,7 @@ boolean prepareParityChunk(int index) {
}
@Override
void decode() {
void decode() throws IOException {
finalizeDecodeInputs();
decodeAndFillBuffer(true);
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
@ -88,7 +89,7 @@ boolean prepareParityChunk(int index) {
}
@Override
void decode() {
void decode() throws IOException {
finalizeDecodeInputs();
decodeAndFillBuffer(false);
}

View File

@ -149,10 +149,11 @@ void skip() {
*/
abstract boolean prepareParityChunk(int index);
/*
/**
* Decode to get the missing data.
* @throws IOException if the decoder is closed.
*/
abstract void decode();
abstract void decode() throws IOException;
/*
* Default close do nothing.
@ -408,7 +409,7 @@ void finalizeDecodeInputs() {
/**
* Decode based on the given input buffers and erasure coding policy.
*/
void decodeAndFillBuffer(boolean fillBuffer) {
void decodeAndFillBuffer(boolean fillBuffer) throws IOException {
// Step 1: prepare indices and output buffers for missing data units
int[] decodeIndices = prepareErasedIndices();

View File

@ -155,7 +155,7 @@ private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen,
return checksumBuf.length;
}
private void reconstructTargets(int toReconstructLen) {
private void reconstructTargets(int toReconstructLen) throws IOException {
ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
ByteBuffer[] outputs = new ByteBuffer[1];

View File

@ -113,7 +113,7 @@ void reconstruct() throws IOException {
}
}
private void reconstructTargets(int toReconstructLen) {
private void reconstructTargets(int toReconstructLen) throws IOException {
ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
int[] erasedIndices = stripedWriter.getRealTargetIndices();

View File

@ -502,7 +502,11 @@ static void verifyParityBlocks(Configuration conf, final long size,
dataBytes.length, parityBytes.length);
final RawErasureEncoder encoder =
CodecUtil.createRawEncoder(conf, codecName, coderOptions);
encoder.encode(dataBytes, expectedParityBytes);
try {
encoder.encode(dataBytes, expectedParityBytes);
} catch (IOException e) {
Assert.fail("Unexpected IOException: " + e.getMessage());
}
for (int i = 0; i < parityBytes.length; i++) {
if (checkSet.contains(i + dataBytes.length)){
Assert.assertArrayEquals("i=" + i, expectedParityBytes[i],