HADOOP-12047. Indicate preference not to affect input buffers during coding in erasure coder. (Contributed by Kai Zheng)

This commit is contained in:
Walter Su 2015-11-02 10:40:14 +08:00
parent 3cde6931cb
commit 6e4f8a46c5
11 changed files with 253 additions and 64 deletions

View File

@ -608,6 +608,9 @@ Trunk (Unreleased)
HADOOP-12327. Initialize output buffers with ZERO bytes in erasure coder. HADOOP-12327. Initialize output buffers with ZERO bytes in erasure coder.
(Kai Zheng via waltersu4549) (Kai Zheng via waltersu4549)
HADOOP-12047. Indicate preference not to affect input buffers during
coding in erasure coder. (Kai Zheng via waltersu4549)
Release 2.8.0 - UNRELEASED Release 2.8.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
/** /**
* A common class of basic facilities to be shared by encoder and decoder * A common class of basic facilities to be shared by encoder and decoder
@ -36,11 +38,39 @@ public abstract class AbstractRawErasureCoder
private final int numDataUnits; private final int numDataUnits;
private final int numParityUnits; private final int numParityUnits;
private final int numAllUnits; private final int numAllUnits;
private final Map<CoderOption, Object> coderOptions;
public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) { public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) {
this.numDataUnits = numDataUnits; this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits; this.numParityUnits = numParityUnits;
this.numAllUnits = numDataUnits + numParityUnits; this.numAllUnits = numDataUnits + numParityUnits;
this.coderOptions = new HashMap<>(3);
coderOptions.put(CoderOption.PREFER_DIRECT_BUFFER, preferDirectBuffer());
coderOptions.put(CoderOption.ALLOW_CHANGE_INPUTS, false);
coderOptions.put(CoderOption.ALLOW_VERBOSE_DUMP, false);
}
@Override
public Object getCoderOption(CoderOption option) {
if (option == null) {
throw new HadoopIllegalArgumentException("Invalid option");
}
return coderOptions.get(option);
}
@Override
public void setCoderOption(CoderOption option, Object value) {
if (option == null || value == null) {
throw new HadoopIllegalArgumentException(
"Invalid option or option value");
}
if (option.isReadOnly()) {
throw new HadoopIllegalArgumentException(
"The option is read-only: " + option.name());
}
coderOptions.put(option, value);
} }
/** /**
@ -75,13 +105,35 @@ protected int getNumAllUnits() {
} }
@Override @Override
public boolean preferDirectBuffer() { public void release() {
// Nothing to do by default
}
/**
* Tell if direct buffer is preferred or not. It's for callers to
* decide how to allocate coding chunk buffers, using DirectByteBuffer or
* bytes array. It will return false by default.
* @return true if native buffer is preferred for performance consideration,
* otherwise false.
*/
protected boolean preferDirectBuffer() {
return false; return false;
} }
@Override protected boolean isAllowingChangeInputs() {
public void release() { Object value = getCoderOption(CoderOption.ALLOW_CHANGE_INPUTS);
// Nothing to do by default if (value != null && value instanceof Boolean) {
return (boolean) value;
}
return false;
}
protected boolean isAllowingVerboseDump() {
Object value = getCoderOption(CoderOption.ALLOW_VERBOSE_DUMP);
if (value != null && value instanceof Boolean) {
return (boolean) value;
}
return false;
} }
/** /**

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
/**
* Supported erasure coder options.
*/
public enum CoderOption {
/* If direct buffer is preferred, for perf consideration */
PREFER_DIRECT_BUFFER(true), // READ-ONLY
/**
* Allow changing input buffer content (not positions).
* Maybe better perf if allowed
*/
ALLOW_CHANGE_INPUTS(false), // READ-WRITE
/* Allow dump verbose debug info or not */
ALLOW_VERBOSE_DUMP(false); // READ-WRITE
private boolean isReadOnly = false;
CoderOption(boolean isReadOnly) {
this.isReadOnly = isReadOnly;
}
public boolean isReadOnly() {
return isReadOnly;
}
};

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil; import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
/** /**
* A raw erasure encoder in RS code scheme in pure Java in case native one * A raw erasure encoder in RS code scheme in pure Java in case native one
@ -54,8 +55,26 @@ public RSRawEncoder(int numDataUnits, int numParityUnits) {
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
// parity units + data units // parity units + data units
ByteBuffer[] all = new ByteBuffer[outputs.length + inputs.length]; ByteBuffer[] all = new ByteBuffer[outputs.length + inputs.length];
System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(inputs, 0, all, outputs.length, inputs.length); if (isAllowingChangeInputs()) {
System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(inputs, 0, all, outputs.length, inputs.length);
} else {
System.arraycopy(outputs, 0, all, 0, outputs.length);
/**
* Note when this coder would be really (rarely) used in a production
* system, this can be optimized to cache and reuse the new allocated
* buffers avoiding reallocating.
*/
ByteBuffer tmp;
for (int i = 0; i < inputs.length; i++) {
tmp = ByteBuffer.allocate(inputs[i].remaining());
tmp.put(inputs[i]);
tmp.flip();
all[outputs.length + i] = tmp;
}
}
// Compute the remainder // Compute the remainder
RSUtil.GF.remainder(all, generatingPolynomial); RSUtil.GF.remainder(all, generatingPolynomial);
@ -67,15 +86,26 @@ protected void doEncode(byte[][] inputs, int[] inputOffsets,
int[] outputOffsets) { int[] outputOffsets) {
// parity units + data units // parity units + data units
byte[][] all = new byte[outputs.length + inputs.length][]; byte[][] all = new byte[outputs.length + inputs.length][];
System.arraycopy(outputs, 0, all, 0, outputs.length); int[] allOffsets = new int[outputOffsets.length + inputOffsets.length];
System.arraycopy(inputs, 0, all, outputs.length, inputs.length);
int[] offsets = new int[inputOffsets.length + outputOffsets.length]; if (isAllowingChangeInputs()) {
System.arraycopy(outputOffsets, 0, offsets, 0, outputOffsets.length); System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(inputOffsets, 0, offsets, System.arraycopy(inputs, 0, all, outputs.length, inputs.length);
outputOffsets.length, inputOffsets.length);
System.arraycopy(outputOffsets, 0, allOffsets, 0, outputOffsets.length);
System.arraycopy(inputOffsets, 0, allOffsets,
outputOffsets.length, inputOffsets.length);
} else {
System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(outputOffsets, 0, allOffsets, 0, outputOffsets.length);
for (int i = 0; i < inputs.length; i++) {
all[outputs.length + i] = Arrays.copyOfRange(inputs[i],
inputOffsets[i], inputOffsets[i] + dataLen);
}
}
// Compute the remainder // Compute the remainder
RSUtil.GF.remainder(all, offsets, dataLen, generatingPolynomial); RSUtil.GF.remainder(all, allOffsets, dataLen, generatingPolynomial);
} }
} }

View File

@ -37,6 +37,20 @@
@InterfaceAudience.Private @InterfaceAudience.Private
public interface RawErasureCoder extends Configurable { public interface RawErasureCoder extends Configurable {
/**
* Get a coder option value.
* @param option
* @return
*/
public Object getCoderOption(CoderOption option);
/**
* Set a coder option value.
* @param option
* @param value
*/
public void setCoderOption(CoderOption option, Object value);
/** /**
* The number of data input units for the coding. A unit can be a byte, * The number of data input units for the coding. A unit can be a byte,
* chunk or buffer or even a block. * chunk or buffer or even a block.
@ -51,15 +65,6 @@ public interface RawErasureCoder extends Configurable {
*/ */
public int getNumParityUnits(); public int getNumParityUnits();
/**
* Tell if direct buffer is preferred or not. It's for callers to
* decide how to allocate coding chunk buffers, using DirectByteBuffer or
* bytes array. It will return false by default.
* @return true if native buffer is preferred for performance consideration,
* otherwise false.
*/
public boolean preferDirectBuffer();
/** /**
* Should be called when release this coder. Good chance to release encoding * Should be called when release this coder. Good chance to release encoding
* or decoding buffers * or decoding buffers

View File

@ -54,24 +54,27 @@ public interface RawErasureDecoder extends RawErasureCoder {
* Note, for both inputs and outputs, no mixing of on-heap buffers and direct * Note, for both inputs and outputs, no mixing of on-heap buffers and direct
* buffers are allowed. * buffers are allowed.
* *
* @param inputs inputs to read data from, contents may change after the call * If the coder option ALLOW_CHANGE_INPUTS is set true (false by default), the
* content of input buffers may change after the call, subject to concrete
* implementation. Anyway the positions of input buffers will move forward.
*
* @param inputs input buffers to read data from
* @param erasedIndexes indexes of erased units in the inputs array * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to * @param outputs output buffers to put decoded data into according to
* erasedIndexes, ready for reading the result data from after * erasedIndexes, ready for read after the call
* the call
*/ */
public void decode(ByteBuffer[] inputs, int[] erasedIndexes, void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs); ByteBuffer[] outputs);
/** /**
* Decode with inputs and erasedIndexes, generates outputs. More see above. * Decode with inputs and erasedIndexes, generates outputs. More see above.
* @param inputs inputs to read data from, contents may change after the call *
* @param inputs input buffers to read data from
* @param erasedIndexes indexes of erased units in the inputs array * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to * @param outputs output buffers to put decoded data into according to
* erasedIndexes, ready for reading the result data from after * erasedIndexes, ready for read after the call
* the call
*/ */
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs); void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs);
/** /**
* Decode with inputs and erasedIndexes, generates outputs. More see above. * Decode with inputs and erasedIndexes, generates outputs. More see above.
@ -79,12 +82,11 @@ public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
* Note, for both input and output ECChunks, no mixing of on-heap buffers and * Note, for both input and output ECChunks, no mixing of on-heap buffers and
* direct buffers are allowed. * direct buffers are allowed.
* *
* @param inputs inputs to read data from, contents may change after the call * @param inputs input buffers to read data from
* @param erasedIndexes indexes of erased units in the inputs array * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to * @param outputs output buffers to put decoded data into according to
* erasedIndexes, ready for reading the result data from after * erasedIndexes, ready for read after the call
* the call
*/ */
public void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs); void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs);
} }

View File

@ -38,29 +38,32 @@ public interface RawErasureEncoder extends RawErasureCoder {
* Note, for both inputs and outputs, no mixing of on-heap buffers and direct * Note, for both inputs and outputs, no mixing of on-heap buffers and direct
* buffers are allowed. * buffers are allowed.
* *
* @param inputs inputs to read data from, contents may change after the call * If the coder option ALLOW_CHANGE_INPUTS is set true (false by default), the
* @param outputs * content of input buffers may change after the call, subject to concrete
* implementation. Anyway the positions of input buffers will move forward.
*
* @param inputs input buffers to read data from
* @param outputs output buffers to put the encoded data into, read to read
* after the call
*/ */
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs); void encode(ByteBuffer[] inputs, ByteBuffer[] outputs);
/** /**
* Encode with inputs and generates outputs * Encode with inputs and generates outputs. More see above.
* @param inputs inputs to read data from, contents may change after the call *
* @param outputs outputs to write into for data generated, ready for reading * @param inputs input buffers to read data from
* the result data from after the call * @param outputs output buffers to put the encoded data into, read to read
* after the call
*/ */
public void encode(byte[][] inputs, byte[][] outputs); void encode(byte[][] inputs, byte[][] outputs);
/** /**
* Encode with inputs and generates outputs. * Encode with inputs and generates outputs. More see above.
* *
* Note, for both input and output ECChunks, no mixing of on-heap buffers and * @param inputs input buffers to read data from
* direct buffers are allowed. * @param outputs output buffers to put the encoded data into, read to read
* * after the call
* @param inputs inputs to read data from, contents may change after the call
* @param outputs outputs to write into for data generated, ready for reading
* the result data from after the call
*/ */
public void encode(ECChunk[] inputs, ECChunk[] outputs); void encode(ECChunk[] inputs, ECChunk[] outputs);
} }

View File

@ -64,6 +64,8 @@ public abstract class TestCoderBase {
private static int FIXED_DATA_GENERATOR = 0; private static int FIXED_DATA_GENERATOR = 0;
protected byte[][] fixedData; protected byte[][] fixedData;
protected boolean allowChangeInputs;
protected int getChunkSize() { protected int getChunkSize() {
return chunkSize; return chunkSize;
} }
@ -253,6 +255,22 @@ protected void eraseDataFromChunks(ECChunk[] chunks) {
} }
} }
protected void markChunks(ECChunk[] chunks) {
for (int i = 0; i < chunks.length; i++) {
if (chunks[i] != null) {
chunks[i].getBuffer().mark();
}
}
}
protected void restoreChunksFromMark(ECChunk[] chunks) {
for (int i = 0; i < chunks.length; i++) {
if (chunks[i] != null) {
chunks[i].getBuffer().reset();
}
}
}
/** /**
* Clone chunks along with copying the associated data. It respects how the * Clone chunks along with copying the associated data. It respects how the
* chunk buffer is allocated, direct or non-direct. It avoids affecting the * chunk buffer is allocated, direct or non-direct. It avoids affecting the
@ -277,6 +295,10 @@ protected ECChunk[] cloneChunksWithData(ECChunk[] chunks) {
* @return a new chunk * @return a new chunk
*/ */
protected ECChunk cloneChunkWithData(ECChunk chunk) { protected ECChunk cloneChunkWithData(ECChunk chunk) {
if (chunk == null) {
return null;
}
ByteBuffer srcBuffer = chunk.getBuffer(); ByteBuffer srcBuffer = chunk.getBuffer();
byte[] bytesArr = new byte[srcBuffer.remaining()]; byte[] bytesArr = new byte[srcBuffer.remaining()];
@ -453,14 +475,16 @@ protected byte[][] toArrays(ECChunk[] chunks) {
byte[][] bytesArr = new byte[chunks.length][]; byte[][] bytesArr = new byte[chunks.length][];
for (int i = 0; i < chunks.length; i++) { for (int i = 0; i < chunks.length; i++) {
bytesArr[i] = chunks[i].toBytesArray(); if (chunks[i] != null) {
bytesArr[i] = chunks[i].toBytesArray();
}
} }
return bytesArr; return bytesArr;
} }
/** /**
* Dump all the settings used in the test case if allowDump is enabled. * Dump all the settings used in the test case if isAllowingVerboseDump is enabled.
*/ */
protected void dumpSetting() { protected void dumpSetting() {
if (allowDump) { if (allowDump) {
@ -473,14 +497,16 @@ protected void dumpSetting() {
append(Arrays.toString(erasedDataIndexes)); append(Arrays.toString(erasedDataIndexes));
sb.append(" erasedParityIndexes="). sb.append(" erasedParityIndexes=").
append(Arrays.toString(erasedParityIndexes)); append(Arrays.toString(erasedParityIndexes));
sb.append(" usingDirectBuffer=").append(usingDirectBuffer).append("\n"); sb.append(" usingDirectBuffer=").append(usingDirectBuffer);
sb.append(" isAllowingChangeInputs=").append(allowChangeInputs);
sb.append("\n");
System.out.println(sb.toString()); System.out.println(sb.toString());
} }
} }
/** /**
* Dump chunks prefixed with a header if allowDump is enabled. * Dump chunks prefixed with a header if isAllowingVerboseDump is enabled.
* @param header * @param header
* @param chunks * @param chunks
*/ */

View File

@ -68,9 +68,9 @@ protected void testCoding(boolean usingDirectBuffer) {
* The following runs will use 3 different chunkSize for inputs and outputs, * The following runs will use 3 different chunkSize for inputs and outputs,
* to verify the same encoder/decoder can process variable width of data. * to verify the same encoder/decoder can process variable width of data.
*/ */
performTestCoding(baseChunkSize, true, false, false); performTestCoding(baseChunkSize, true, false, false, false);
performTestCoding(baseChunkSize - 17, false, false, false); performTestCoding(baseChunkSize - 17, false, false, false, true);
performTestCoding(baseChunkSize + 16, true, false, false); performTestCoding(baseChunkSize + 16, true, false, false, false);
} }
/** /**
@ -82,7 +82,7 @@ protected void testCodingWithBadInput(boolean usingDirectBuffer) {
prepareCoders(); prepareCoders();
try { try {
performTestCoding(baseChunkSize, false, true, false); performTestCoding(baseChunkSize, false, true, false, true);
Assert.fail("Encoding test with bad input should fail"); Assert.fail("Encoding test with bad input should fail");
} catch (Exception e) { } catch (Exception e) {
// Expected // Expected
@ -98,7 +98,7 @@ protected void testCodingWithBadOutput(boolean usingDirectBuffer) {
prepareCoders(); prepareCoders();
try { try {
performTestCoding(baseChunkSize, false, false, true); performTestCoding(baseChunkSize, false, false, true, true);
Assert.fail("Decoding test with bad output should fail"); Assert.fail("Decoding test with bad output should fail");
} catch (Exception e) { } catch (Exception e) {
// Expected // Expected
@ -123,9 +123,11 @@ public void testCodingWithErasingTooMany() {
} }
private void performTestCoding(int chunkSize, boolean usingSlicedBuffer, private void performTestCoding(int chunkSize, boolean usingSlicedBuffer,
boolean useBadInput, boolean useBadOutput) { boolean useBadInput, boolean useBadOutput,
boolean allowChangeInputs) {
setChunkSize(chunkSize); setChunkSize(chunkSize);
prepareBufferAllocator(usingSlicedBuffer); prepareBufferAllocator(usingSlicedBuffer);
setAllowChangeInputs(allowChangeInputs);
dumpSetting(); dumpSetting();
@ -141,10 +143,16 @@ private void performTestCoding(int chunkSize, boolean usingSlicedBuffer,
// Backup all the source chunks for later recovering because some coders // Backup all the source chunks for later recovering because some coders
// may affect the source data. // may affect the source data.
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks); ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
markChunks(dataChunks);
encoder.encode(dataChunks, parityChunks); encoder.encode(dataChunks, parityChunks);
dumpChunks("Encoded parity chunks", parityChunks); dumpChunks("Encoded parity chunks", parityChunks);
if (!allowChangeInputs) {
restoreChunksFromMark(dataChunks);
compareAndVerify(clonedDataChunks, dataChunks);
}
// Backup and erase some chunks // Backup and erase some chunks
ECChunk[] backupChunks = backupAndEraseChunks(clonedDataChunks, parityChunks); ECChunk[] backupChunks = backupAndEraseChunks(clonedDataChunks, parityChunks);
@ -160,14 +168,31 @@ private void performTestCoding(int chunkSize, boolean usingSlicedBuffer,
corruptSomeChunk(recoveredChunks); corruptSomeChunk(recoveredChunks);
} }
ECChunk[] clonedInputChunks = null;
if (!allowChangeInputs) {
markChunks(inputChunks);
clonedInputChunks = cloneChunksWithData(inputChunks);
}
dumpChunks("Decoding input chunks", inputChunks); dumpChunks("Decoding input chunks", inputChunks);
decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks); decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
dumpChunks("Decoded/recovered chunks", recoveredChunks); dumpChunks("Decoded/recovered chunks", recoveredChunks);
if (!allowChangeInputs) {
restoreChunksFromMark(inputChunks);
compareAndVerify(clonedInputChunks, inputChunks);
}
// Compare // Compare
compareAndVerify(backupChunks, recoveredChunks); compareAndVerify(backupChunks, recoveredChunks);
} }
private void setAllowChangeInputs(boolean allowChangeInputs) {
this.allowChangeInputs = allowChangeInputs;
encoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, allowChangeInputs);
decoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, allowChangeInputs);
}
private void prepareCoders() { private void prepareCoders() {
if (encoder == null) { if (encoder == null) {
encoder = createEncoder(); encoder = createEncoder();

View File

@ -29,6 +29,7 @@ public class TestXORRawCoder extends TestRawCoderBase {
public void setup() { public void setup() {
this.encoderClass = XORRawEncoder.class; this.encoderClass = XORRawEncoder.class;
this.decoderClass = XORRawDecoder.class; this.decoderClass = XORRawDecoder.class;
setAllowDump(false);
} }
@Test @Test

View File

@ -892,7 +892,6 @@ boolean prepareParityChunk(int index) {
@Override @Override
void decode() { void decode() {
// TODO no copy for data chunks. this depends on HADOOP-12047
final int span = (int) alignedStripe.getSpanInBlock(); final int span = (int) alignedStripe.getSpanInBlock();
for (int i = 0; i < alignedStripe.chunks.length; i++) { for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i] != null && if (alignedStripe.chunks[i] != null &&