HADOOP-11588. Benchmark framework and test for erasure coders. Contributed by Rui Li

This commit is contained in:
Kai Zheng 2016-08-12 15:05:52 +08:00
parent 5199db387d
commit 8fbb57fbd9
3 changed files with 490 additions and 39 deletions

View File

@ -39,28 +39,6 @@ public class RSRawDecoderLegacy extends RawErasureDecoder {
private int[] errSignature;
private int[] primitivePower;
/**
* We need a set of reusable buffers either for the bytes array
* decoding version or direct buffer decoding version. Normally not both.
*
* For output, in addition to the valid buffers from the caller
* passed from above, we need to provide extra buffers for the internal
* decoding implementation. For output, the caller should provide no more
* than numParityUnits but at least one buffers. And the left buffers will be
* borrowed from either bytesArrayBuffers, for the bytes array version.
*
*/
// Reused buffers for decoding with bytes arrays
private byte[][] bytesArrayBuffers = new byte[getNumParityUnits()][];
private byte[][] adjustedByteArrayOutputsParameter =
new byte[getNumParityUnits()][];
private int[] adjustedOutputOffsets = new int[getNumParityUnits()];
// Reused buffers for decoding with direct ByteBuffers
private ByteBuffer[] directBuffers = new ByteBuffer[getNumParityUnits()];
private ByteBuffer[] adjustedDirectBufferOutputsParameter =
new ByteBuffer[getNumParityUnits()];
public RSRawDecoderLegacy(ErasureCoderOptions coderOptions) {
super(coderOptions);
if (getNumAllUnits() >= RSUtil.GF.getFieldSize()) {
@ -139,16 +117,14 @@ protected void doDecode(ByteArrayDecodingState decodingState) {
* implementations, so we have to adjust them before calling doDecodeImpl.
*/
byte[][] bytesArrayBuffers = new byte[getNumParityUnits()][];
byte[][] adjustedByteArrayOutputsParameter =
new byte[getNumParityUnits()][];
int[] adjustedOutputOffsets = new int[getNumParityUnits()];
int[] erasedOrNotToReadIndexes =
CoderUtil.getNullIndexes(decodingState.inputs);
// Prepare for adjustedOutputsParameter
// First reset the positions needed this time
for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
adjustedByteArrayOutputsParameter[i] = null;
adjustedOutputOffsets[i] = 0;
}
// Use the caller passed buffers in erasedIndexes positions
for (int outputIdx = 0, i = 0;
i < decodingState.erasedIndexes.length; i++) {
@ -174,7 +150,8 @@ protected void doDecode(ByteArrayDecodingState decodingState) {
for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) {
if (adjustedByteArrayOutputsParameter[i] == null) {
adjustedByteArrayOutputsParameter[i] = CoderUtil.resetBuffer(
checkGetBytesArrayBuffer(bufferIdx, dataLen), 0, dataLen);
checkGetBytesArrayBuffer(bytesArrayBuffers, bufferIdx, dataLen),
0, dataLen);
adjustedOutputOffsets[i] = 0; // Always 0 for such temp output
bufferIdx++;
}
@ -198,12 +175,10 @@ protected void doDecode(ByteBufferDecodingState decodingState) {
int[] erasedOrNotToReadIndexes =
CoderUtil.getNullIndexes(decodingState.inputs);
// Prepare for adjustedDirectBufferOutputsParameter
ByteBuffer[] directBuffers = new ByteBuffer[getNumParityUnits()];
ByteBuffer[] adjustedDirectBufferOutputsParameter =
new ByteBuffer[getNumParityUnits()];
// First reset the positions needed this time
for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
adjustedDirectBufferOutputsParameter[i] = null;
}
// Use the caller passed buffers in erasedIndexes positions
for (int outputIdx = 0, i = 0;
i < decodingState.erasedIndexes.length; i++) {
@ -225,7 +200,8 @@ protected void doDecode(ByteBufferDecodingState decodingState) {
// Use shared buffers for other positions (not set yet)
for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) {
if (adjustedDirectBufferOutputsParameter[i] == null) {
ByteBuffer buffer = checkGetDirectBuffer(bufferIdx, dataLen);
ByteBuffer buffer = checkGetDirectBuffer(
directBuffers, bufferIdx, dataLen);
buffer.position(0);
buffer.limit(dataLen);
adjustedDirectBufferOutputsParameter[i] =
@ -274,15 +250,17 @@ private <T> void adjustOrder(T[] inputs, T[] inputs2,
numErasedParityUnits, numErasedDataUnits);
}
private byte[] checkGetBytesArrayBuffer(int idx, int bufferLen) {
private static byte[] checkGetBytesArrayBuffer(byte[][] bytesArrayBuffers,
int idx, int bufferLen) {
if (bytesArrayBuffers[idx] == null ||
bytesArrayBuffers[idx].length < bufferLen) {
bytesArrayBuffers[idx].length < bufferLen) {
bytesArrayBuffers[idx] = new byte[bufferLen];
}
return bytesArrayBuffers[idx];
}
private ByteBuffer checkGetDirectBuffer(int idx, int bufferLen) {
private static ByteBuffer checkGetDirectBuffer(ByteBuffer[] directBuffers,
int idx, int bufferLen) {
if (directBuffers[idx] == null ||
directBuffers[idx].capacity() < bufferLen) {
directBuffers[idx] = ByteBuffer.allocateDirect(bufferLen);

View File

@ -0,0 +1,408 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import com.google.common.base.Preconditions;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.util.StopWatch;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* A benchmark tool to test the performance of different erasure coders.
* The tool launches multiple threads to encode/decode certain amount of data,
* and measures the total throughput. It only focuses on performance and doesn't
* validate correctness of the encoded/decoded results.
* User can specify the data size each thread processes, as well as the chunk
* size to use for the coder.
* Different coders are supported. User can specify the coder by a coder index.
* The coder is shared among all the threads.
*/
public final class RawErasureCoderBenchmark {
private RawErasureCoderBenchmark() {
// prevent instantiation
}
// target size of input data buffer
private static final int TARGET_BUFFER_SIZE_MB = 126;
private static final int MAX_CHUNK_SIZE =
TARGET_BUFFER_SIZE_MB / BenchData.NUM_DATA_UNITS * 1024;
private static final List<RawErasureCoderFactory> CODER_MAKERS =
Collections.unmodifiableList(
Arrays.asList(new DummyRawErasureCoderFactory(),
new RSRawErasureCoderFactoryLegacy(),
new RSRawErasureCoderFactory(),
new NativeRSRawErasureCoderFactory()));
enum CODER {
DUMMY_CODER("Dummy coder"),
LEGACY_RS_CODER("Legacy Reed-Solomon Java coder"),
RS_CODER("Reed-Solomon Java coder"),
ISAL_CODER("ISA-L coder");
private final String name;
CODER(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
static {
Preconditions.checkArgument(CODER_MAKERS.size() == CODER.values().length);
}
private static void printAvailableCoders() {
StringBuilder sb = new StringBuilder(
"Available coders with coderIndex:\n");
for (CODER coder : CODER.values()) {
sb.append(coder.ordinal()).append(":").append(coder).append("\n");
}
System.out.println(sb.toString());
}
private static void usage(String message) {
if (message != null) {
System.out.println(message);
}
System.out.println(
"Usage: RawErasureCoderBenchmark <encode/decode> <coderIndex> " +
"[numThreads] [dataSize-in-MB] [chunkSize-in-KB]");
printAvailableCoders();
System.exit(1);
}
public static void main(String[] args) throws Exception {
String opType = null;
int coderIndex = 0;
// default values
int dataSizeMB = 10240;
int chunkSizeKB = 1024;
int numThreads = 1;
if (args.length > 1) {
opType = args[0];
if (!"encode".equals(opType) && !"decode".equals(opType)) {
usage("Invalid type: should be either 'encode' or 'decode'");
}
try {
coderIndex = Integer.parseInt(args[1]);
if (coderIndex < 0 || coderIndex >= CODER.values().length) {
usage("Invalid coder index, should be [0-" +
(CODER.values().length - 1) + "]");
}
} catch (NumberFormatException e) {
usage("Malformed coder index, " + e.getMessage());
}
} else {
usage(null);
}
if (args.length > 2) {
try {
numThreads = Integer.parseInt(args[2]);
if (numThreads <= 0) {
usage("Invalid number of threads.");
}
} catch (NumberFormatException e) {
usage("Malformed number of threads, " + e.getMessage());
}
}
if (args.length > 3) {
try {
dataSizeMB = Integer.parseInt(args[3]);
if (dataSizeMB <= 0) {
usage("Invalid data size.");
}
} catch (NumberFormatException e) {
usage("Malformed data size, " + e.getMessage());
}
}
if (args.length > 4) {
try {
chunkSizeKB = Integer.parseInt(args[4]);
if (chunkSizeKB <= 0) {
usage("Chunk size should be positive.");
}
if (chunkSizeKB > MAX_CHUNK_SIZE) {
usage("Chunk size should be no larger than " + MAX_CHUNK_SIZE);
}
} catch (NumberFormatException e) {
usage("Malformed chunk size, " + e.getMessage());
}
}
performBench(opType, CODER.values()[coderIndex],
numThreads, dataSizeMB, chunkSizeKB);
}
/**
* Performs benchmark.
*
* @param opType The operation to perform. Can be encode or decode
* @param coder The coder to use
* @param numThreads Number of threads to launch concurrently
* @param dataSizeMB Total test data size in MB
* @param chunkSizeKB Chunk size in KB
*/
public static void performBench(String opType, CODER coder,
int numThreads, int dataSizeMB, int chunkSizeKB) throws Exception {
BenchData.configure(dataSizeMB, chunkSizeKB);
RawErasureEncoder encoder = null;
RawErasureDecoder decoder = null;
ByteBuffer testData;
boolean isEncode = opType.equals("encode");
if (isEncode) {
encoder = getRawEncoder(coder.ordinal());
testData = genTestData(encoder.preferDirectBuffer(),
BenchData.bufferSizeKB);
} else {
decoder = getRawDecoder(coder.ordinal());
testData = genTestData(decoder.preferDirectBuffer(),
BenchData.bufferSizeKB);
}
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<Future<Long>> futures = new ArrayList<>(numThreads);
StopWatch sw = new StopWatch().start();
for (int i = 0; i < numThreads; i++) {
futures.add(executor.submit(new BenchmarkCallable(isEncode,
encoder, decoder, testData.duplicate())));
}
List<Long> durations = new ArrayList<>(numThreads);
try {
for (Future<Long> future : futures) {
durations.add(future.get());
}
long duration = sw.now(TimeUnit.MILLISECONDS);
double totalDataSize = BenchData.totalDataSizeKB * numThreads / 1024.0;
DecimalFormat df = new DecimalFormat("#.##");
System.out.println(coder + " " + opType + " " +
df.format(totalDataSize) + "MB data, with chunk size " +
BenchData.chunkSize / 1024 + "KB");
System.out.println("Total time: " + df.format(duration / 1000.0) + " s.");
System.out.println("Total throughput: " + df.format(
totalDataSize / duration * 1000.0) + " MB/s");
printThreadStatistics(durations, df);
} catch (Exception e) {
System.out.println("Error waiting for thread to finish.");
e.printStackTrace();
throw e;
} finally {
executor.shutdown();
}
}
private static RawErasureEncoder getRawEncoder(int index) {
RawErasureEncoder encoder =
CODER_MAKERS.get(index).createEncoder(BenchData.OPTIONS);
final boolean isDirect = encoder.preferDirectBuffer();
encoder.encode(
getBufferForInit(BenchData.NUM_DATA_UNITS, 1, isDirect),
getBufferForInit(BenchData.NUM_PARITY_UNITS, 1, isDirect));
return encoder;
}
private static RawErasureDecoder getRawDecoder(int index) {
RawErasureDecoder decoder =
CODER_MAKERS.get(index).createDecoder(BenchData.OPTIONS);
final boolean isDirect = decoder.preferDirectBuffer();
ByteBuffer[] inputs = getBufferForInit(
BenchData.NUM_ALL_UNITS, 1, isDirect);
for (int erasedIndex : BenchData.ERASED_INDEXES) {
inputs[erasedIndex] = null;
}
decoder.decode(inputs, BenchData.ERASED_INDEXES,
getBufferForInit(BenchData.ERASED_INDEXES.length, 1, isDirect));
return decoder;
}
private static ByteBuffer[] getBufferForInit(int numBuf,
int bufCap, boolean isDirect) {
ByteBuffer[] buffers = new ByteBuffer[numBuf];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = isDirect ? ByteBuffer.allocateDirect(bufCap) :
ByteBuffer.allocate(bufCap);
}
return buffers;
}
private static void printThreadStatistics(
List<Long> durations, DecimalFormat df) {
Collections.sort(durations);
System.out.println("Threads statistics: ");
Double min = durations.get(0) / 1000.0;
Double max = durations.get(durations.size() - 1) / 1000.0;
Long sum = 0L;
for (Long duration : durations) {
sum += duration;
}
Double avg = sum.doubleValue() / durations.size() / 1000.0;
Double percentile = durations.get(
(int) Math.ceil(durations.size() * 0.9) - 1) / 1000.0;
System.out.println(durations.size() + " threads in total.");
System.out.println("Min: " + df.format(min) + " s, Max: " +
df.format(max) + " s, Avg: " + df.format(avg) +
" s, 90th Percentile: " + df.format(percentile) + " s.");
}
private static ByteBuffer genTestData(boolean useDirectBuffer, int sizeKB) {
Random random = new Random();
int bufferSize = sizeKB * 1024;
byte[] tmp = new byte[bufferSize];
random.nextBytes(tmp);
ByteBuffer data = useDirectBuffer ?
ByteBuffer.allocateDirect(bufferSize) :
ByteBuffer.allocate(bufferSize);
data.put(tmp);
data.flip();
return data;
}
private static class BenchData {
public static final ErasureCoderOptions OPTIONS =
new ErasureCoderOptions(6, 3);
public static final int NUM_DATA_UNITS = OPTIONS.getNumDataUnits();
public static final int NUM_PARITY_UNITS = OPTIONS.getNumParityUnits();
public static final int NUM_ALL_UNITS = OPTIONS.getNumAllUnits();
private static int chunkSize;
private static long totalDataSizeKB;
private static int bufferSizeKB;
private static final int[] ERASED_INDEXES = new int[]{6, 7, 8};
private final ByteBuffer[] inputs = new ByteBuffer[NUM_DATA_UNITS];
private ByteBuffer[] outputs = new ByteBuffer[NUM_PARITY_UNITS];
private ByteBuffer[] decodeInputs = new ByteBuffer[NUM_ALL_UNITS];
public static void configure(int dataSizeMB, int chunkSizeKB) {
chunkSize = chunkSizeKB * 1024;
// buffer size needs to be a multiple of (numDataUnits * chunkSize)
int round = (int) Math.round(
TARGET_BUFFER_SIZE_MB * 1024.0 / NUM_DATA_UNITS / chunkSizeKB);
Preconditions.checkArgument(round > 0);
bufferSizeKB = NUM_DATA_UNITS * chunkSizeKB * round;
System.out.println("Using " + bufferSizeKB / 1024 + "MB buffer.");
round = (int) Math.round(
(dataSizeMB * 1024.0) / bufferSizeKB);
if (round == 0) {
round = 1;
}
totalDataSizeKB = round * bufferSizeKB;
}
public BenchData(boolean useDirectBuffer) {
for (int i = 0; i < outputs.length; i++) {
outputs[i] = useDirectBuffer ? ByteBuffer.allocateDirect(chunkSize) :
ByteBuffer.allocate(chunkSize);
}
}
public void prepareDecInput() {
System.arraycopy(inputs, 0, decodeInputs, 0, NUM_DATA_UNITS);
}
public void encode(RawErasureEncoder encoder) {
encoder.encode(inputs, outputs);
}
public void decode(RawErasureDecoder decoder) {
decoder.decode(decodeInputs, ERASED_INDEXES, outputs);
}
}
private static class BenchmarkCallable implements Callable<Long> {
private final boolean isEncode;
private final RawErasureEncoder encoder;
private final RawErasureDecoder decoder;
private final BenchData benchData;
private final ByteBuffer testData;
public BenchmarkCallable(boolean isEncode, RawErasureEncoder encoder,
RawErasureDecoder decoder, ByteBuffer testData) {
if (isEncode) {
Preconditions.checkArgument(encoder != null);
this.encoder = encoder;
this.decoder = null;
benchData = new BenchData(encoder.preferDirectBuffer());
} else {
Preconditions.checkArgument(decoder != null);
this.decoder = decoder;
this.encoder = null;
benchData = new BenchData(decoder.preferDirectBuffer());
}
this.isEncode = isEncode;
this.testData = testData;
}
@Override
public Long call() throws Exception {
long rounds = BenchData.totalDataSizeKB / BenchData.bufferSizeKB;
StopWatch sw = new StopWatch().start();
for (long i = 0; i < rounds; i++) {
while (testData.remaining() > 0) {
for (ByteBuffer output : benchData.outputs) {
output.clear();
}
for (int j = 0; j < benchData.inputs.length; j++) {
benchData.inputs[j] = testData.duplicate();
benchData.inputs[j].limit(
testData.position() + BenchData.chunkSize);
benchData.inputs[j] = benchData.inputs[j].slice();
testData.position(testData.position() + BenchData.chunkSize);
}
if (isEncode) {
benchData.encode(encoder);
} else {
benchData.prepareDecInput();
benchData.decode(decoder);
}
}
testData.clear();
}
return sw.now(TimeUnit.MILLISECONDS);
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.junit.Assume;
import org.junit.Test;
/**
* Tests for the raw erasure coder benchmark tool.
*/
public class TestRawErasureCoderBenchmark {
@Test
public void testDummyCoder() throws Exception {
// Dummy coder
RawErasureCoderBenchmark.performBench("encode",
RawErasureCoderBenchmark.CODER.DUMMY_CODER, 2, 100, 1024);
RawErasureCoderBenchmark.performBench("decode",
RawErasureCoderBenchmark.CODER.DUMMY_CODER, 5, 150, 100);
}
@Test
public void testLegacyRSCoder() throws Exception {
// Legacy RS Java coder
RawErasureCoderBenchmark.performBench("encode",
RawErasureCoderBenchmark.CODER.LEGACY_RS_CODER, 2, 80, 200);
RawErasureCoderBenchmark.performBench("decode",
RawErasureCoderBenchmark.CODER.LEGACY_RS_CODER, 5, 300, 350);
}
@Test
public void testRSCoder() throws Exception {
// RS Java coder
RawErasureCoderBenchmark.performBench("encode",
RawErasureCoderBenchmark.CODER.RS_CODER, 3, 200, 200);
RawErasureCoderBenchmark.performBench("decode",
RawErasureCoderBenchmark.CODER.RS_CODER, 4, 135, 20);
}
@Test
public void testISALCoder() throws Exception {
Assume.assumeTrue(ErasureCodeNative.isNativeCodeLoaded());
// ISA-L coder
RawErasureCoderBenchmark.performBench("encode",
RawErasureCoderBenchmark.CODER.ISAL_CODER, 5, 300, 64);
RawErasureCoderBenchmark.performBench("decode",
RawErasureCoderBenchmark.CODER.ISAL_CODER, 6, 200, 128);
}
}