HADOOP-15499. Performance severe drops when running RawErasureCoderBenchmark with NativeRSRawErasureCoder. Contributed by Sammi Chen.
This commit is contained in:
parent
ccfb816d39
commit
18201b882a
@ -25,6 +25,7 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract native raw decoder for all native coders to extend with.
|
* Abstract native raw decoder for all native coders to extend with.
|
||||||
@ -34,36 +35,46 @@ abstract class AbstractNativeRawDecoder extends RawErasureDecoder {
|
|||||||
public static Logger LOG =
|
public static Logger LOG =
|
||||||
LoggerFactory.getLogger(AbstractNativeRawDecoder.class);
|
LoggerFactory.getLogger(AbstractNativeRawDecoder.class);
|
||||||
|
|
||||||
|
// Protect ISA-L coder data structure in native layer from being accessed and
|
||||||
|
// updated concurrently by the init, release and decode functions.
|
||||||
|
protected final ReentrantReadWriteLock decoderLock =
|
||||||
|
new ReentrantReadWriteLock();
|
||||||
|
|
||||||
public AbstractNativeRawDecoder(ErasureCoderOptions coderOptions) {
|
public AbstractNativeRawDecoder(ErasureCoderOptions coderOptions) {
|
||||||
super(coderOptions);
|
super(coderOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void doDecode(ByteBufferDecodingState decodingState)
|
protected void doDecode(ByteBufferDecodingState decodingState)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (nativeCoder == 0) {
|
decoderLock.readLock().lock();
|
||||||
throw new IOException(String.format("%s closed",
|
try {
|
||||||
getClass().getSimpleName()));
|
if (nativeCoder == 0) {
|
||||||
}
|
throw new IOException(String.format("%s closed",
|
||||||
int[] inputOffsets = new int[decodingState.inputs.length];
|
getClass().getSimpleName()));
|
||||||
int[] outputOffsets = new int[decodingState.outputs.length];
|
|
||||||
|
|
||||||
ByteBuffer buffer;
|
|
||||||
for (int i = 0; i < decodingState.inputs.length; ++i) {
|
|
||||||
buffer = decodingState.inputs[i];
|
|
||||||
if (buffer != null) {
|
|
||||||
inputOffsets[i] = buffer.position();
|
|
||||||
}
|
}
|
||||||
}
|
int[] inputOffsets = new int[decodingState.inputs.length];
|
||||||
|
int[] outputOffsets = new int[decodingState.outputs.length];
|
||||||
|
|
||||||
for (int i = 0; i < decodingState.outputs.length; ++i) {
|
ByteBuffer buffer;
|
||||||
buffer = decodingState.outputs[i];
|
for (int i = 0; i < decodingState.inputs.length; ++i) {
|
||||||
outputOffsets[i] = buffer.position();
|
buffer = decodingState.inputs[i];
|
||||||
}
|
if (buffer != null) {
|
||||||
|
inputOffsets[i] = buffer.position();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
performDecodeImpl(decodingState.inputs, inputOffsets,
|
for (int i = 0; i < decodingState.outputs.length; ++i) {
|
||||||
decodingState.decodeLength, decodingState.erasedIndexes,
|
buffer = decodingState.outputs[i];
|
||||||
decodingState.outputs, outputOffsets);
|
outputOffsets[i] = buffer.position();
|
||||||
|
}
|
||||||
|
|
||||||
|
performDecodeImpl(decodingState.inputs, inputOffsets,
|
||||||
|
decodingState.decodeLength, decodingState.erasedIndexes,
|
||||||
|
decodingState.outputs, outputOffsets);
|
||||||
|
} finally {
|
||||||
|
decoderLock.readLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void performDecodeImpl(ByteBuffer[] inputs,
|
protected abstract void performDecodeImpl(ByteBuffer[] inputs,
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract native raw encoder for all native coders to extend with.
|
* Abstract native raw encoder for all native coders to extend with.
|
||||||
@ -34,34 +35,44 @@ abstract class AbstractNativeRawEncoder extends RawErasureEncoder {
|
|||||||
public static Logger LOG =
|
public static Logger LOG =
|
||||||
LoggerFactory.getLogger(AbstractNativeRawEncoder.class);
|
LoggerFactory.getLogger(AbstractNativeRawEncoder.class);
|
||||||
|
|
||||||
|
// Protect ISA-L coder data structure in native layer from being accessed and
|
||||||
|
// updated concurrently by the init, release and encode functions.
|
||||||
|
protected final ReentrantReadWriteLock encoderLock =
|
||||||
|
new ReentrantReadWriteLock();
|
||||||
|
|
||||||
public AbstractNativeRawEncoder(ErasureCoderOptions coderOptions) {
|
public AbstractNativeRawEncoder(ErasureCoderOptions coderOptions) {
|
||||||
super(coderOptions);
|
super(coderOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void doEncode(ByteBufferEncodingState encodingState)
|
protected void doEncode(ByteBufferEncodingState encodingState)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (nativeCoder == 0) {
|
encoderLock.readLock().lock();
|
||||||
throw new IOException(String.format("%s closed",
|
try {
|
||||||
getClass().getSimpleName()));
|
if (nativeCoder == 0) {
|
||||||
}
|
throw new IOException(String.format("%s closed",
|
||||||
int[] inputOffsets = new int[encodingState.inputs.length];
|
getClass().getSimpleName()));
|
||||||
int[] outputOffsets = new int[encodingState.outputs.length];
|
}
|
||||||
int dataLen = encodingState.inputs[0].remaining();
|
int[] inputOffsets = new int[encodingState.inputs.length];
|
||||||
|
int[] outputOffsets = new int[encodingState.outputs.length];
|
||||||
|
int dataLen = encodingState.inputs[0].remaining();
|
||||||
|
|
||||||
ByteBuffer buffer;
|
ByteBuffer buffer;
|
||||||
for (int i = 0; i < encodingState.inputs.length; ++i) {
|
for (int i = 0; i < encodingState.inputs.length; ++i) {
|
||||||
buffer = encodingState.inputs[i];
|
buffer = encodingState.inputs[i];
|
||||||
inputOffsets[i] = buffer.position();
|
inputOffsets[i] = buffer.position();
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < encodingState.outputs.length; ++i) {
|
for (int i = 0; i < encodingState.outputs.length; ++i) {
|
||||||
buffer = encodingState.outputs[i];
|
buffer = encodingState.outputs[i];
|
||||||
outputOffsets[i] = buffer.position();
|
outputOffsets[i] = buffer.position();
|
||||||
}
|
}
|
||||||
|
|
||||||
performEncodeImpl(encodingState.inputs, inputOffsets, dataLen,
|
performEncodeImpl(encodingState.inputs, inputOffsets, dataLen,
|
||||||
encodingState.outputs, outputOffsets);
|
encodingState.outputs, outputOffsets);
|
||||||
|
} finally {
|
||||||
|
encoderLock.readLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void performEncodeImpl(
|
protected abstract void performEncodeImpl(
|
||||||
|
@ -36,19 +36,30 @@ public class NativeRSRawDecoder extends AbstractNativeRawDecoder {
|
|||||||
|
|
||||||
public NativeRSRawDecoder(ErasureCoderOptions coderOptions) {
|
public NativeRSRawDecoder(ErasureCoderOptions coderOptions) {
|
||||||
super(coderOptions);
|
super(coderOptions);
|
||||||
initImpl(coderOptions.getNumDataUnits(), coderOptions.getNumParityUnits());
|
decoderLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
initImpl(coderOptions.getNumDataUnits(),
|
||||||
|
coderOptions.getNumParityUnits());
|
||||||
|
} finally {
|
||||||
|
decoderLock.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void performDecodeImpl(
|
protected void performDecodeImpl(
|
||||||
ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased,
|
ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased,
|
||||||
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
|
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
|
||||||
decodeImpl(inputs, inputOffsets, dataLen, erased, outputs, outputOffsets);
|
decodeImpl(inputs, inputOffsets, dataLen, erased, outputs, outputOffsets);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void release() {
|
public void release() {
|
||||||
destroyImpl();
|
decoderLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
destroyImpl();
|
||||||
|
} finally {
|
||||||
|
decoderLock.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -36,19 +36,30 @@ public class NativeRSRawEncoder extends AbstractNativeRawEncoder {
|
|||||||
|
|
||||||
public NativeRSRawEncoder(ErasureCoderOptions coderOptions) {
|
public NativeRSRawEncoder(ErasureCoderOptions coderOptions) {
|
||||||
super(coderOptions);
|
super(coderOptions);
|
||||||
initImpl(coderOptions.getNumDataUnits(), coderOptions.getNumParityUnits());
|
encoderLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
initImpl(coderOptions.getNumDataUnits(),
|
||||||
|
coderOptions.getNumParityUnits());
|
||||||
|
} finally {
|
||||||
|
encoderLock.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void performEncodeImpl(
|
protected void performEncodeImpl(
|
||||||
ByteBuffer[] inputs, int[] inputOffsets, int dataLen,
|
ByteBuffer[] inputs, int[] inputOffsets, int dataLen,
|
||||||
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
|
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
|
||||||
encodeImpl(inputs, inputOffsets, dataLen, outputs, outputOffsets);
|
encodeImpl(inputs, inputOffsets, dataLen, outputs, outputOffsets);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void release() {
|
public void release() {
|
||||||
destroyImpl();
|
encoderLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
destroyImpl();
|
||||||
|
} finally {
|
||||||
|
encoderLock.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -36,19 +36,30 @@ public class NativeXORRawDecoder extends AbstractNativeRawDecoder {
|
|||||||
|
|
||||||
public NativeXORRawDecoder(ErasureCoderOptions coderOptions) {
|
public NativeXORRawDecoder(ErasureCoderOptions coderOptions) {
|
||||||
super(coderOptions);
|
super(coderOptions);
|
||||||
initImpl(coderOptions.getNumDataUnits(), coderOptions.getNumParityUnits());
|
decoderLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
initImpl(coderOptions.getNumDataUnits(),
|
||||||
|
coderOptions.getNumParityUnits());
|
||||||
|
} finally {
|
||||||
|
decoderLock.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void performDecodeImpl(
|
protected void performDecodeImpl(
|
||||||
ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased,
|
ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased,
|
||||||
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
|
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
|
||||||
decodeImpl(inputs, inputOffsets, dataLen, erased, outputs, outputOffsets);
|
decodeImpl(inputs, inputOffsets, dataLen, erased, outputs, outputOffsets);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void release() {
|
public void release() {
|
||||||
destroyImpl();
|
decoderLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
destroyImpl();
|
||||||
|
} finally {
|
||||||
|
decoderLock.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private native void initImpl(int numDataUnits, int numParityUnits);
|
private native void initImpl(int numDataUnits, int numParityUnits);
|
||||||
|
@ -36,19 +36,30 @@ public class NativeXORRawEncoder extends AbstractNativeRawEncoder {
|
|||||||
|
|
||||||
public NativeXORRawEncoder(ErasureCoderOptions coderOptions) {
|
public NativeXORRawEncoder(ErasureCoderOptions coderOptions) {
|
||||||
super(coderOptions);
|
super(coderOptions);
|
||||||
initImpl(coderOptions.getNumDataUnits(), coderOptions.getNumParityUnits());
|
encoderLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
initImpl(coderOptions.getNumDataUnits(),
|
||||||
|
coderOptions.getNumParityUnits());
|
||||||
|
} finally {
|
||||||
|
encoderLock.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void performEncodeImpl(
|
protected void performEncodeImpl(
|
||||||
ByteBuffer[] inputs, int[] inputOffsets, int dataLen,
|
ByteBuffer[] inputs, int[] inputOffsets, int dataLen,
|
||||||
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
|
ByteBuffer[] outputs, int[] outputOffsets) throws IOException {
|
||||||
encodeImpl(inputs, inputOffsets, dataLen, outputs, outputOffsets);
|
encodeImpl(inputs, inputOffsets, dataLen, outputs, outputOffsets);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void release() {
|
public void release() {
|
||||||
destroyImpl();
|
encoderLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
destroyImpl();
|
||||||
|
} finally {
|
||||||
|
encoderLock.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private native void initImpl(int numDataUnits, int numParityUnits);
|
private native void initImpl(int numDataUnits, int numParityUnits);
|
||||||
|
@ -230,6 +230,12 @@ public static void performBench(String opType, CODER coder,
|
|||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
|
if (encoder != null) {
|
||||||
|
encoder.release();
|
||||||
|
}
|
||||||
|
if (decoder != null) {
|
||||||
|
decoder.release();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user