HADOOP-13010. Refactor raw erasure coders. Contributed by Kai Zheng

This commit is contained in:
Kai Zheng 2016-05-27 13:23:34 +08:00
parent 4f513a4a8e
commit 77202fa103
46 changed files with 1478 additions and 1102 deletions

View File

@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
@ -36,115 +35,61 @@ public final class CodecUtil {
/**
* Create RS raw encoder according to configuration.
* @param conf configuration possibly with some items to configure the coder
* @param numDataUnits number of data units in a coding group
* @param numParityUnits number of parity units in a coding group
* @param conf configuration
* @param coderOptions coder options that's used to create the coder
* @param codec the codec to use. If null, will use the default codec
* @return raw encoder
*/
public static RawErasureEncoder createRSRawEncoder(
Configuration conf, int numDataUnits, int numParityUnits, String codec) {
public static RawErasureEncoder createRawEncoder(
Configuration conf, String codec, ErasureCoderOptions coderOptions) {
Preconditions.checkNotNull(conf);
if (codec == null) {
codec = ErasureCodeConstants.RS_DEFAULT_CODEC_NAME;
}
RawErasureCoder rawCoder = createRawCoder(conf,
getFactNameFromCodec(conf, codec), true, numDataUnits, numParityUnits);
return (RawErasureEncoder) rawCoder;
}
Preconditions.checkNotNull(codec);
/**
* Create RS raw encoder using the default codec.
*/
public static RawErasureEncoder createRSRawEncoder(
Configuration conf, int numDataUnits, int numParityUnits) {
return createRSRawEncoder(conf, numDataUnits, numParityUnits, null);
String rawCoderFactoryKey = getFactNameFromCodec(conf, codec);
RawErasureCoderFactory fact = createRawCoderFactory(conf,
rawCoderFactoryKey);
return fact.createEncoder(coderOptions);
}
/**
* Create RS raw decoder according to configuration.
* @param conf configuration possibly with some items to configure the coder
* @param numDataUnits number of data units in a coding group
* @param numParityUnits number of parity units in a coding group
* @param conf configuration
* @param coderOptions coder options that's used to create the coder
* @param codec the codec to use. If null, will use the default codec
* @return raw decoder
*/
public static RawErasureDecoder createRSRawDecoder(
Configuration conf, int numDataUnits, int numParityUnits, String codec) {
public static RawErasureDecoder createRawDecoder(
Configuration conf, String codec, ErasureCoderOptions coderOptions) {
Preconditions.checkNotNull(conf);
if (codec == null) {
codec = ErasureCodeConstants.RS_DEFAULT_CODEC_NAME;
}
RawErasureCoder rawCoder = createRawCoder(conf,
getFactNameFromCodec(conf, codec), false, numDataUnits, numParityUnits);
return (RawErasureDecoder) rawCoder;
Preconditions.checkNotNull(codec);
String rawCoderFactoryKey = getFactNameFromCodec(conf, codec);
RawErasureCoderFactory fact = createRawCoderFactory(conf,
rawCoderFactoryKey);
return fact.createDecoder(coderOptions);
}
/**
* Create RS raw decoder using the default codec.
*/
public static RawErasureDecoder createRSRawDecoder(
Configuration conf, int numDataUnits, int numParityUnits) {
return createRSRawDecoder(conf, numDataUnits, numParityUnits, null);
}
/**
* Create XOR raw encoder according to configuration.
* @param conf configuration possibly with some items to configure the coder
* @param numDataUnits number of data units in a coding group
* @param numParityUnits number of parity units in a coding group
* @return raw encoder
*/
public static RawErasureEncoder createXORRawEncoder(
Configuration conf, int numDataUnits, int numParityUnits) {
Preconditions.checkNotNull(conf);
RawErasureCoder rawCoder = createRawCoder(conf,
getFactNameFromCodec(conf, ErasureCodeConstants.XOR_CODEC_NAME),
true, numDataUnits, numParityUnits);
return (RawErasureEncoder) rawCoder;
}
/**
* Create XOR raw decoder according to configuration.
* @param conf configuration possibly with some items to configure the coder
* @param numDataUnits number of data units in a coding group
* @param numParityUnits number of parity units in a coding group
* @return raw decoder
*/
public static RawErasureDecoder createXORRawDecoder(
Configuration conf, int numDataUnits, int numParityUnits) {
Preconditions.checkNotNull(conf);
RawErasureCoder rawCoder = createRawCoder(conf,
getFactNameFromCodec(conf, ErasureCodeConstants.XOR_CODEC_NAME),
false, numDataUnits, numParityUnits);
return (RawErasureDecoder) rawCoder;
}
/**
* Create raw coder using specified conf and raw coder factory key.
* @param conf configuration possibly with some items to configure the coder
* @param rawCoderFactory name of the raw coder factory
* @param isEncoder is encoder or not we're going to create
* @param numDataUnits number of data units in a coding group
* @param numParityUnits number of parity units in a coding group
* @return raw coder
*/
public static RawErasureCoder createRawCoder(Configuration conf,
String rawCoderFactory, boolean isEncoder, int numDataUnits,
int numParityUnits) {
private static RawErasureCoderFactory createRawCoderFactory(
Configuration conf, String rawCoderFactoryKey) {
RawErasureCoderFactory fact;
try {
Class<? extends RawErasureCoderFactory> factClass = conf.getClassByName(
rawCoderFactory).asSubclass(RawErasureCoderFactory.class);
rawCoderFactoryKey).asSubclass(RawErasureCoderFactory.class);
fact = factClass.newInstance();
} catch (ClassNotFoundException | InstantiationException |
IllegalAccessException e) {
throw new RuntimeException("Failed to create raw coder", e);
throw new RuntimeException("Failed to create raw coder factory", e);
}
return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) :
fact.createDecoder(numDataUnits, numParityUnits);
if (fact == null) {
throw new RuntimeException("Failed to create raw coder factory");
}
return fact;
}
private static String getFactNameFromCodec(Configuration conf, String codec) {

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Erasure coder configuration that maintains schema info and coder options.
*/
@InterfaceAudience.Private
public final class ErasureCoderOptions {
private final int numDataUnits;
private final int numParityUnits;
private final int numAllUnits;
private final boolean allowChangeInputs;
private final boolean allowVerboseDump;
public ErasureCoderOptions(int numDataUnits, int numParityUnits) {
this(numDataUnits, numParityUnits, false, false);
}
public ErasureCoderOptions(int numDataUnits, int numParityUnits,
boolean allowChangeInputs, boolean allowVerboseDump) {
this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits;
this.numAllUnits = numDataUnits + numParityUnits;
this.allowChangeInputs = allowChangeInputs;
this.allowVerboseDump = allowVerboseDump;
}
/**
* The number of data input units for the coding. A unit can be a byte,
* chunk or buffer or even a block.
* @return count of data input units
*/
public int getNumDataUnits() {
return numDataUnits;
}
/**
* The number of parity output units for the coding. A unit can be a byte,
* chunk, buffer or even a block.
* @return count of parity output units
*/
public int getNumParityUnits() {
return numParityUnits;
}
/**
* The number of all the involved units in the coding.
* @return count of all the data units and parity units
*/
public int getNumAllUnits() {
return numAllUnits;
}
/**
* Allow changing input buffer content (not positions). Maybe better
* performance if not allowed.
* @return true if allowing input content to be changed, false otherwise
*/
public boolean allowChangeInputs() {
return allowChangeInputs;
}
/**
* Allow dump verbose debug info or not.
* @return true if verbose debug info is desired, false otherwise
*/
public boolean allowVerboseDump() {
return allowVerboseDump;
}
}

View File

@ -22,7 +22,10 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.*;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/**
* Hitchhiker is a new erasure coding algorithm developed as a research project
@ -68,17 +71,20 @@ public class HHXORErasureDecoder extends AbstractErasureDecoder {
private RawErasureDecoder checkCreateRSRawDecoder() {
if (rsRawDecoder == null) {
rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(),
getNumDataUnits(), getNumParityUnits());
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
getNumDataUnits(), getNumParityUnits());
rsRawDecoder = CodecUtil.createRawDecoder(getConf(),
ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
}
return rsRawDecoder;
}
private RawErasureEncoder checkCreateXorRawEncoder() {
if (xorRawEncoder == null) {
xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(),
getNumDataUnits(), getNumParityUnits());
xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false);
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
getNumDataUnits(), getNumParityUnits());
xorRawEncoder = CodecUtil.createRawEncoder(getConf(),
ErasureCodeConstants.XOR_CODEC_NAME, coderOptions);
}
return xorRawEncoder;
}

View File

@ -22,7 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.CoderOption;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/**
@ -64,17 +65,21 @@ public class HHXORErasureEncoder extends AbstractErasureEncoder {
private RawErasureEncoder checkCreateRSRawEncoder() {
if (rsRawEncoder == null) {
rsRawEncoder = CodecUtil.createRSRawEncoder(getConf(),
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
getNumDataUnits(), getNumParityUnits());
rsRawEncoder = CodecUtil.createRawEncoder(getConf(),
ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
}
return rsRawEncoder;
}
private RawErasureEncoder checkCreateXorRawEncoder() {
if (xorRawEncoder == null) {
xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(),
getNumDataUnits(), getNumParityUnits());
xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false);
ErasureCoderOptions erasureCoderOptions = new ErasureCoderOptions(
getNumDataUnits(), getNumParityUnits());
xorRawEncoder = CodecUtil.createRawEncoder(getConf(),
ErasureCodeConstants.XOR_CODEC_NAME,
erasureCoderOptions);
}
return xorRawEncoder;
}

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
/**
@ -55,8 +57,10 @@ public class RSErasureDecoder extends AbstractErasureDecoder {
private RawErasureDecoder checkCreateRSRawDecoder() {
if (rsRawDecoder == null) {
// TODO: we should create the raw coder according to codec.
rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(),
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
getNumDataUnits(), getNumParityUnits());
rsRawDecoder = CodecUtil.createRawDecoder(getConf(),
ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
}
return rsRawDecoder;
}

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/**
@ -55,8 +57,10 @@ public class RSErasureEncoder extends AbstractErasureEncoder {
private RawErasureEncoder checkCreateRSRawEncoder() {
if (rawEncoder == null) {
// TODO: we should create the raw coder according to codec.
rawEncoder = CodecUtil.createRSRawEncoder(getConf(),
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
getNumDataUnits(), getNumParityUnits());
rawEncoder = CodecUtil.createRawEncoder(getConf(),
ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
}
return rawEncoder;
}

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
/**
@ -43,8 +45,10 @@ public class XORErasureDecoder extends AbstractErasureDecoder {
@Override
protected ErasureCodingStep prepareDecodingStep(
final ECBlockGroup blockGroup) {
RawErasureDecoder rawDecoder = CodecUtil.createXORRawDecoder(getConf(),
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
getNumDataUnits(), getNumParityUnits());
RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(getConf(),
ErasureCodeConstants.XOR_CODEC_NAME, coderOptions);
ECBlock[] inputBlocks = getInputBlocks(blockGroup);

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/**
@ -43,8 +45,10 @@ public class XORErasureEncoder extends AbstractErasureEncoder {
@Override
protected ErasureCodingStep prepareEncodingStep(
final ECBlockGroup blockGroup) {
RawErasureEncoder rawEncoder = CodecUtil.createXORRawEncoder(getConf(),
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
getNumDataUnits(), getNumParityUnits());
RawErasureEncoder rawEncoder = CodecUtil.createRawEncoder(getConf(),
ErasureCodeConstants.XOR_CODEC_NAME, coderOptions);
ECBlock[] inputBlocks = getInputBlocks(blockGroup);

View File

@ -1,220 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configured;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
/**
* A common class of basic facilities to be shared by encoder and decoder
*
* It implements the {@link RawErasureCoder} interface.
*/
@InterfaceAudience.Private
public abstract class AbstractRawErasureCoder
extends Configured implements RawErasureCoder {
private static byte[] emptyChunk = new byte[4096];
private final int numDataUnits;
private final int numParityUnits;
private final int numAllUnits;
private final Map<CoderOption, Object> coderOptions;
public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) {
this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits;
this.numAllUnits = numDataUnits + numParityUnits;
this.coderOptions = new HashMap<>(3);
coderOptions.put(CoderOption.PREFER_DIRECT_BUFFER, preferDirectBuffer());
coderOptions.put(CoderOption.ALLOW_CHANGE_INPUTS, false);
coderOptions.put(CoderOption.ALLOW_VERBOSE_DUMP, false);
}
@Override
public Object getCoderOption(CoderOption option) {
if (option == null) {
throw new HadoopIllegalArgumentException("Invalid option");
}
return coderOptions.get(option);
}
@Override
public void setCoderOption(CoderOption option, Object value) {
if (option == null || value == null) {
throw new HadoopIllegalArgumentException(
"Invalid option or option value");
}
if (option.isReadOnly()) {
throw new HadoopIllegalArgumentException(
"The option is read-only: " + option.name());
}
coderOptions.put(option, value);
}
/**
* Make sure to return an empty chunk buffer for the desired length.
* @param leastLength
* @return empty chunk of zero bytes
*/
protected static byte[] getEmptyChunk(int leastLength) {
if (emptyChunk.length >= leastLength) {
return emptyChunk; // In most time
}
synchronized (AbstractRawErasureCoder.class) {
emptyChunk = new byte[leastLength];
}
return emptyChunk;
}
@Override
public int getNumDataUnits() {
return numDataUnits;
}
@Override
public int getNumParityUnits() {
return numParityUnits;
}
protected int getNumAllUnits() {
return numAllUnits;
}
@Override
public void release() {
// Nothing to do by default
}
/**
* Tell if direct buffer is preferred or not. It's for callers to
* decide how to allocate coding chunk buffers, using DirectByteBuffer or
* bytes array. It will return false by default.
* @return true if native buffer is preferred for performance consideration,
* otherwise false.
*/
protected boolean preferDirectBuffer() {
return false;
}
protected boolean isAllowingChangeInputs() {
Object value = getCoderOption(CoderOption.ALLOW_CHANGE_INPUTS);
if (value != null && value instanceof Boolean) {
return (boolean) value;
}
return false;
}
protected boolean isAllowingVerboseDump() {
Object value = getCoderOption(CoderOption.ALLOW_VERBOSE_DUMP);
if (value != null && value instanceof Boolean) {
return (boolean) value;
}
return false;
}
/**
* Ensure a buffer filled with ZERO bytes from current readable/writable
* position.
* @param buffer a buffer ready to read / write certain size bytes
* @return the buffer itself, with ZERO bytes written, the position and limit
* are not changed after the call
*/
protected ByteBuffer resetBuffer(ByteBuffer buffer, int len) {
int pos = buffer.position();
buffer.put(getEmptyChunk(len), 0, len);
buffer.position(pos);
return buffer;
}
/**
* Ensure the buffer (either input or output) ready to read or write with ZERO
* bytes fully in specified length of len.
* @param buffer bytes array buffer
* @return the buffer itself
*/
protected byte[] resetBuffer(byte[] buffer, int offset, int len) {
byte[] empty = getEmptyChunk(len);
System.arraycopy(empty, 0, buffer, offset, len);
return buffer;
}
/**
* Check and ensure the buffers are of the length specified by dataLen, also
* ensure the buffers are direct buffers or not according to isDirectBuffer.
* @param buffers the buffers to check
* @param allowNull whether to allow any element to be null or not
* @param dataLen the length of data available in the buffer to ensure with
* @param isDirectBuffer is direct buffer or not to ensure with
* @param isOutputs is output buffer or not
*/
protected void checkParameterBuffers(ByteBuffer[] buffers, boolean
allowNull, int dataLen, boolean isDirectBuffer, boolean isOutputs) {
for (ByteBuffer buffer : buffers) {
if (buffer == null && !allowNull) {
throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null");
} else if (buffer != null) {
if (buffer.remaining() != dataLen) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, not of length " + dataLen);
}
if (buffer.isDirect() != isDirectBuffer) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, isDirect should be " + isDirectBuffer);
}
if (isOutputs) {
resetBuffer(buffer, dataLen);
}
}
}
}
/**
* Check and ensure the buffers are of the length specified by dataLen. If is
* output buffers, ensure they will be ZEROed.
* @param buffers the buffers to check
* @param allowNull whether to allow any element to be null or not
* @param dataLen the length of data available in the buffer to ensure with
* @param isOutputs is output buffer or not
*/
protected void checkParameterBuffers(byte[][] buffers, boolean allowNull,
int dataLen, boolean isOutputs) {
for (byte[] buffer : buffers) {
if (buffer == null && !allowNull) {
throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null");
} else if (buffer != null && buffer.length != dataLen) {
throw new HadoopIllegalArgumentException(
"Invalid buffer not of length " + dataLen);
} else if (isOutputs) {
resetBuffer(buffer, 0, dataLen);
}
}
}
}

View File

@ -1,181 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.rawcoder.util.CoderUtil;
import java.nio.ByteBuffer;
/**
* An abstract raw erasure decoder that's to be inherited by new decoders.
*
* It implements the {@link RawErasureDecoder} interface.
*/
@InterfaceAudience.Private
public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
implements RawErasureDecoder {
public AbstractRawErasureDecoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
}
@Override
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
checkParameters(inputs, erasedIndexes, outputs);
ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
boolean usingDirectBuffer = validInput.isDirect();
int dataLen = validInput.remaining();
if (dataLen == 0) {
return;
}
checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false);
checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
int[] inputPositions = new int[inputs.length];
for (int i = 0; i < inputPositions.length; i++) {
if (inputs[i] != null) {
inputPositions[i] = inputs[i].position();
}
}
if (usingDirectBuffer) {
doDecode(inputs, erasedIndexes, outputs);
} else {
int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
if (buffer != null) {
inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
doDecode(newInputs, inputOffsets, dataLen,
erasedIndexes, newOutputs, outputOffsets);
}
for (int i = 0; i < inputs.length; i++) {
if (inputs[i] != null) {
// dataLen bytes consumed
inputs[i].position(inputPositions[i] + dataLen);
}
}
}
/**
* Perform the real decoding using Direct ByteBuffer.
* @param inputs Direct ByteBuffers expected
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs Direct ByteBuffers expected
*/
protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs);
@Override
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
checkParameters(inputs, erasedIndexes, outputs);
byte[] validInput = CoderUtil.findFirstValidInput(inputs);
int dataLen = validInput.length;
if (dataLen == 0) {
return;
}
checkParameterBuffers(inputs, true, dataLen, false);
checkParameterBuffers(outputs, false, dataLen, true);
int[] inputOffsets = new int[inputs.length]; // ALL ZERO
int[] outputOffsets = new int[outputs.length]; // ALL ZERO
doDecode(inputs, inputOffsets, dataLen, erasedIndexes, outputs,
outputOffsets);
}
/**
* Perform the real decoding using bytes array, supporting offsets and
* lengths.
* @param inputs the input byte arrays to read data from
* @param inputOffsets offsets for the input byte arrays to read data from
* @param dataLen how much data are to be read from
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs the output byte arrays to write resultant data into
* @param outputOffsets offsets from which to write resultant data into
*/
protected abstract void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets);
@Override
public void decode(ECChunk[] inputs, int[] erasedIndexes,
ECChunk[] outputs) {
ByteBuffer[] newInputs = ECChunk.toBuffers(inputs);
ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs);
decode(newInputs, erasedIndexes, newOutputs);
}
/**
* Check and validate decoding parameters, throw exception accordingly. The
* checking assumes it's a MDS code. Other code can override this.
* @param inputs input buffers to check
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs output buffers to check
*/
protected <T> void checkParameters(T[] inputs, int[] erasedIndexes,
T[] outputs) {
if (inputs.length != getNumParityUnits() + getNumDataUnits()) {
throw new IllegalArgumentException("Invalid inputs length");
}
if (erasedIndexes.length != outputs.length) {
throw new HadoopIllegalArgumentException(
"erasedIndexes and outputs mismatch in length");
}
if (erasedIndexes.length > getNumParityUnits()) {
throw new HadoopIllegalArgumentException(
"Too many erased, not recoverable");
}
int validInputs = 0;
for (T input : inputs) {
if (input != null) {
validInputs += 1;
}
}
if (validInputs < getNumDataUnits()) {
throw new HadoopIllegalArgumentException(
"No enough valid inputs are provided, not recoverable");
}
}
}

View File

@ -1,146 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ECChunk;
import java.nio.ByteBuffer;
/**
* An abstract raw erasure encoder that's to be inherited by new encoders.
*
* It implements the {@link RawErasureEncoder} interface.
*/
@InterfaceAudience.Private
public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
implements RawErasureEncoder {
public AbstractRawErasureEncoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
}
@Override
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
checkParameters(inputs, outputs);
boolean usingDirectBuffer = inputs[0].isDirect();
int dataLen = inputs[0].remaining();
if (dataLen == 0) {
return;
}
checkParameterBuffers(inputs, false, dataLen, usingDirectBuffer, false);
checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
int[] inputPositions = new int[inputs.length];
for (int i = 0; i < inputPositions.length; i++) {
if (inputs[i] != null) {
inputPositions[i] = inputs[i].position();
}
}
if (usingDirectBuffer) {
doEncode(inputs, outputs);
} else {
int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets);
}
for (int i = 0; i < inputs.length; i++) {
if (inputs[i] != null) {
// dataLen bytes consumed
inputs[i].position(inputPositions[i] + dataLen);
}
}
}
/**
* Perform the real encoding work using direct ByteBuffer
* @param inputs Direct ByteBuffers expected
* @param outputs Direct ByteBuffers expected
*/
protected abstract void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs);
@Override
public void encode(byte[][] inputs, byte[][] outputs) {
checkParameters(inputs, outputs);
int dataLen = inputs[0].length;
if (dataLen == 0) {
return;
}
checkParameterBuffers(inputs, false, dataLen, false);
checkParameterBuffers(outputs, false, dataLen, true);
int[] inputOffsets = new int[inputs.length]; // ALL ZERO
int[] outputOffsets = new int[outputs.length]; // ALL ZERO
doEncode(inputs, inputOffsets, dataLen, outputs, outputOffsets);
}
/**
* Perform the real encoding work using bytes array, supporting offsets
* and lengths.
* @param inputs the input byte arrays to read data from
* @param inputOffsets offsets for the input byte arrays to read data from
* @param dataLen how much data are to be read from
* @param outputs the output byte arrays to write resultant data into
* @param outputOffsets offsets from which to write resultant data into
*/
protected abstract void doEncode(byte[][] inputs, int[] inputOffsets,
int dataLen, byte[][] outputs,
int[] outputOffsets);
@Override
public void encode(ECChunk[] inputs, ECChunk[] outputs) {
ByteBuffer[] newInputs = ECChunk.toBuffers(inputs);
ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs);
encode(newInputs, newOutputs);
}
/**
* Check and validate decoding parameters, throw exception accordingly.
* @param inputs input buffers to check
* @param outputs output buffers to check
*/
protected <T> void checkParameters(T[] inputs, T[] outputs) {
if (inputs.length != getNumDataUnits()) {
throw new HadoopIllegalArgumentException("Invalid inputs length");
}
if (outputs.length != getNumParityUnits()) {
throw new HadoopIllegalArgumentException("Invalid outputs length");
}
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A utility class that maintains decoding state during a decode call using
* byte array inputs.
*/
@InterfaceAudience.Private
class ByteArrayDecodingState extends DecodingState {
byte[][] inputs;
int[] inputOffsets;
int[] erasedIndexes;
byte[][] outputs;
int[] outputOffsets;
ByteArrayDecodingState(RawErasureDecoder decoder, byte[][] inputs,
int[] erasedIndexes, byte[][] outputs) {
this.decoder = decoder;
this.inputs = inputs;
this.outputs = outputs;
this.erasedIndexes = erasedIndexes;
byte[] validInput = CoderUtil.findFirstValidInput(inputs);
this.decodeLength = validInput.length;
checkParameters(inputs, erasedIndexes, outputs);
checkInputBuffers(inputs);
checkOutputBuffers(outputs);
this.inputOffsets = new int[inputs.length]; // ALL ZERO
this.outputOffsets = new int[outputs.length]; // ALL ZERO
}
ByteArrayDecodingState(RawErasureDecoder decoder,
int decodeLength,
int[] erasedIndexes,
byte[][] inputs,
int[] inputOffsets,
byte[][] outputs,
int[] outputOffsets) {
this.decoder = decoder;
this.decodeLength = decodeLength;
this.erasedIndexes = erasedIndexes;
this.inputs = inputs;
this.outputs = outputs;
this.inputOffsets = inputOffsets;
this.outputOffsets = outputOffsets;
}
/**
* Check and ensure the buffers are of the desired length.
* @param buffers the buffers to check
*/
void checkInputBuffers(byte[][] buffers) {
int validInputs = 0;
for (byte[] buffer : buffers) {
if (buffer == null) {
continue;
}
if (buffer.length != decodeLength) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, not of length " + decodeLength);
}
validInputs++;
}
if (validInputs < decoder.getNumDataUnits()) {
throw new HadoopIllegalArgumentException(
"No enough valid inputs are provided, not recoverable");
}
}
/**
* Check and ensure the buffers are of the desired length.
* @param buffers the buffers to check
*/
void checkOutputBuffers(byte[][] buffers) {
for (byte[] buffer : buffers) {
if (buffer == null) {
throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null");
}
if (buffer.length != decodeLength) {
throw new HadoopIllegalArgumentException(
"Invalid buffer not of length " + decodeLength);
}
}
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A utility class that maintains encoding state during an encode call using
* byte array inputs.
*/
@InterfaceAudience.Private
class ByteArrayEncodingState extends EncodingState {
byte[][] inputs;
byte[][] outputs;
int[] inputOffsets;
int[] outputOffsets;
ByteArrayEncodingState(RawErasureEncoder encoder,
byte[][] inputs, byte[][] outputs) {
this.encoder = encoder;
byte[] validInput = CoderUtil.findFirstValidInput(inputs);
this.encodeLength = validInput.length;
this.inputs = inputs;
this.outputs = outputs;
checkParameters(inputs, outputs);
checkBuffers(inputs);
checkBuffers(outputs);
this.inputOffsets = new int[inputs.length]; // ALL ZERO
this.outputOffsets = new int[outputs.length]; // ALL ZERO
}
ByteArrayEncodingState(RawErasureEncoder encoder,
int encodeLength,
byte[][] inputs,
int[] inputOffsets,
byte[][] outputs,
int[] outputOffsets) {
this.encoder = encoder;
this.encodeLength = encodeLength;
this.inputs = inputs;
this.outputs = outputs;
this.inputOffsets = inputOffsets;
this.outputOffsets = outputOffsets;
}
/**
* Check and ensure the buffers are of the desired length.
* @param buffers the buffers to check
*/
void checkBuffers(byte[][] buffers) {
for (byte[] buffer : buffers) {
if (buffer == null) {
throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null");
}
if (buffer.length != encodeLength) {
throw new HadoopIllegalArgumentException(
"Invalid buffer not of length " + encodeLength);
}
}
}
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import java.nio.ByteBuffer;
/**
* A utility class that maintains decoding state during a decode call using
* ByteBuffer inputs.
*/
@InterfaceAudience.Private
class ByteBufferDecodingState extends DecodingState {
ByteBuffer[] inputs;
ByteBuffer[] outputs;
int[] erasedIndexes;
boolean usingDirectBuffer;
ByteBufferDecodingState(RawErasureDecoder decoder, ByteBuffer[] inputs,
int[] erasedIndexes, ByteBuffer[] outputs) {
this.decoder = decoder;
this.inputs = inputs;
this.outputs = outputs;
this.erasedIndexes = erasedIndexes;
ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
this.decodeLength = validInput.remaining();
this.usingDirectBuffer = validInput.isDirect();
checkParameters(inputs, erasedIndexes, outputs);
checkInputBuffers(inputs);
checkOutputBuffers(outputs);
}
/**
* Convert to a ByteArrayEncodingState when it's backed by on-heap arrays.
*/
ByteArrayDecodingState convertToByteArrayState() {
int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
if (buffer != null) {
inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
ByteArrayDecodingState baeState = new ByteArrayDecodingState(decoder,
decodeLength, erasedIndexes, newInputs,
inputOffsets, newOutputs, outputOffsets);
return baeState;
}
/**
* Check and ensure the buffers are of the desired length and type, direct
* buffers or not.
* @param buffers the buffers to check
*/
void checkInputBuffers(ByteBuffer[] buffers) {
int validInputs = 0;
for (ByteBuffer buffer : buffers) {
if (buffer == null) {
continue;
}
if (buffer.remaining() != decodeLength) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, not of length " + decodeLength);
}
if (buffer.isDirect() != usingDirectBuffer) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, isDirect should be " + usingDirectBuffer);
}
validInputs++;
}
if (validInputs < decoder.getNumDataUnits()) {
throw new HadoopIllegalArgumentException(
"No enough valid inputs are provided, not recoverable");
}
}
/**
* Check and ensure the buffers are of the desired length and type, direct
* buffers or not.
* @param buffers the buffers to check
*/
void checkOutputBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer : buffers) {
if (buffer == null) {
throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null");
}
if (buffer.remaining() != decodeLength) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, not of length " + decodeLength);
}
if (buffer.isDirect() != usingDirectBuffer) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, isDirect should be " + usingDirectBuffer);
}
}
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import java.nio.ByteBuffer;
/**
* A utility class that maintains encoding state during an encode call using
* ByteBuffer inputs.
*/
@InterfaceAudience.Private
class ByteBufferEncodingState extends EncodingState {
ByteBuffer[] inputs;
ByteBuffer[] outputs;
boolean usingDirectBuffer;
ByteBufferEncodingState(RawErasureEncoder encoder,
ByteBuffer[] inputs, ByteBuffer[] outputs) {
this.encoder = encoder;
ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
this.encodeLength = validInput.remaining();
this.usingDirectBuffer = validInput.isDirect();
this.inputs = inputs;
this.outputs = outputs;
checkParameters(inputs, outputs);
checkBuffers(inputs);
checkBuffers(outputs);
}
/**
* Convert to a ByteArrayEncodingState when it's backed by on-heap arrays.
*/
ByteArrayEncodingState convertToByteArrayState() {
int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
ByteArrayEncodingState baeState = new ByteArrayEncodingState(encoder,
encodeLength, newInputs, inputOffsets, newOutputs, outputOffsets);
return baeState;
}
/**
* Check and ensure the buffers are of the desired length and type, direct
* buffers or not.
* @param buffers the buffers to check
*/
void checkBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer : buffers) {
if (buffer == null) {
throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null");
}
if (buffer.remaining() != encodeLength) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, not of length " + encodeLength);
}
if (buffer.isDirect() != usingDirectBuffer) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, isDirect should be " + usingDirectBuffer);
}
}
}
}

View File

@ -0,0 +1,199 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ECChunk;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Helpful utilities for implementing some raw erasure coders.
*/
@InterfaceAudience.Private
final class CoderUtil {
private CoderUtil() {
// No called
}
private static byte[] emptyChunk = new byte[4096];
/**
* Make sure to return an empty chunk buffer for the desired length.
* @param leastLength
* @return empty chunk of zero bytes
*/
static byte[] getEmptyChunk(int leastLength) {
if (emptyChunk.length >= leastLength) {
return emptyChunk; // In most time
}
synchronized (CoderUtil.class) {
emptyChunk = new byte[leastLength];
}
return emptyChunk;
}
/**
* Ensure a buffer filled with ZERO bytes from current readable/writable
* position.
* @param buffer a buffer ready to read / write certain size bytes
* @return the buffer itself, with ZERO bytes written, the position and limit
* are not changed after the call
*/
static ByteBuffer resetBuffer(ByteBuffer buffer, int len) {
int pos = buffer.position();
buffer.put(getEmptyChunk(len), 0, len);
buffer.position(pos);
return buffer;
}
/**
* Ensure the buffer (either input or output) ready to read or write with ZERO
* bytes fully in specified length of len.
* @param buffer bytes array buffer
* @return the buffer itself
*/
static byte[] resetBuffer(byte[] buffer, int offset, int len) {
byte[] empty = getEmptyChunk(len);
System.arraycopy(empty, 0, buffer, offset, len);
return buffer;
}
/**
* Initialize the output buffers with ZERO bytes.
* @param buffers
* @param dataLen
*/
static void resetOutputBuffers(ByteBuffer[] buffers, int dataLen) {
for (ByteBuffer buffer : buffers) {
resetBuffer(buffer, dataLen);
}
}
/**
* Initialize the output buffers with ZERO bytes.
* @param buffers
* @param dataLen
*/
static void resetOutputBuffers(byte[][] buffers, int[] offsets,
int dataLen) {
for (int i = 0; i < buffers.length; i++) {
resetBuffer(buffers[i], offsets[i], dataLen);
}
}
/**
* Convert an array of this chunks to an array of ByteBuffers
* @param chunks chunks to convertToByteArrayState into buffers
* @return an array of ByteBuffers
*/
static ByteBuffer[] toBuffers(ECChunk[] chunks) {
ByteBuffer[] buffers = new ByteBuffer[chunks.length];
ECChunk chunk;
for (int i = 0; i < chunks.length; i++) {
chunk = chunks[i];
if (chunk == null) {
buffers[i] = null;
} else {
buffers[i] = chunk.getBuffer();
}
}
return buffers;
}
/**
* Clone an input bytes array as direct ByteBuffer.
* @param input
* @param len
* @param offset
* @return direct ByteBuffer
*/
static ByteBuffer cloneAsDirectByteBuffer(byte[] input, int offset, int len) {
if (input == null) { // an input can be null, if erased or not to read
return null;
}
ByteBuffer directBuffer = ByteBuffer.allocateDirect(len);
directBuffer.put(input, offset, len);
directBuffer.flip();
return directBuffer;
}
/**
* Get indexes array for items marked as null, either erased or
* not to read.
* @return indexes array
*/
static <T> int[] getNullIndexes(T[] inputs) {
int[] nullIndexes = new int[inputs.length];
int idx = 0;
for (int i = 0; i < inputs.length; i++) {
if (inputs[i] == null) {
nullIndexes[idx++] = i;
}
}
return Arrays.copyOf(nullIndexes, idx);
}
/**
* Find the valid input from all the inputs.
* @param inputs input buffers to look for valid input
* @return the first valid input
*/
static <T> T findFirstValidInput(T[] inputs) {
if (inputs.length > 0 && inputs[0] != null) {
return inputs[0];
}
for (T input : inputs) {
if (input != null) {
return input;
}
}
throw new HadoopIllegalArgumentException(
"Invalid inputs are found, all being null");
}
/**
* Picking up indexes of valid inputs.
* @param inputs decoding input buffers
* @param <T>
*/
static <T> int[] getValidIndexes(T[] inputs) {
int[] validIndexes = new int[inputs.length];
int idx = 0;
for (int i = 0; i < inputs.length; i++) {
if (inputs[i] != null) {
validIndexes[idx++] = i;
}
}
return Arrays.copyOf(validIndexes, idx);
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A utility class that maintains decoding state during a decode call.
*/
@InterfaceAudience.Private
class DecodingState {
RawErasureDecoder decoder;
int decodeLength;
/**
* Check and validate decoding parameters, throw exception accordingly. The
* checking assumes it's a MDS code. Other code can override this.
* @param inputs input buffers to check
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs output buffers to check
*/
<T> void checkParameters(T[] inputs, int[] erasedIndexes,
T[] outputs) {
if (inputs.length != decoder.getNumParityUnits() +
decoder.getNumDataUnits()) {
throw new IllegalArgumentException("Invalid inputs length");
}
if (erasedIndexes.length != outputs.length) {
throw new HadoopIllegalArgumentException(
"erasedIndexes and outputs mismatch in length");
}
if (erasedIndexes.length > decoder.getNumParityUnits()) {
throw new HadoopIllegalArgumentException(
"Too many erased, not recoverable");
}
}
}

View File

@ -18,8 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/**
* A dummy raw decoder that does no real computation.
@ -28,20 +27,19 @@ import java.nio.ByteBuffer;
* instead of codec, and is intended for test only.
*/
@InterfaceAudience.Private
public class DummyRawDecoder extends AbstractRawErasureDecoder {
public DummyRawDecoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
public class DummyRawDecoder extends RawErasureDecoder {
public DummyRawDecoder(ErasureCoderOptions coderOptions) {
super(coderOptions);
}
@Override
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
protected void doDecode(ByteBufferDecodingState decodingState) {
// Nothing to do. Output buffers have already been reset
}
@Override
protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen,
int[] erasedIndexes, byte[][] outputs, int[] outputOffsets) {
protected void doDecode(ByteArrayDecodingState decodingState) {
// Nothing to do. Output buffers have already been reset
}
}

View File

@ -18,8 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/**
* A dummy raw encoder that does no real computation.
@ -28,19 +27,19 @@ import java.nio.ByteBuffer;
* instead of codec, and is intended for test only.
*/
@InterfaceAudience.Private
public class DummyRawEncoder extends AbstractRawErasureEncoder {
public DummyRawEncoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
public class DummyRawEncoder extends RawErasureEncoder {
public DummyRawEncoder(ErasureCoderOptions coderOptions) {
super(coderOptions);
}
@Override
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
protected void doEncode(ByteArrayEncodingState encodingState) {
// Nothing to do. Output buffers have already been reset
}
@Override
protected void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen,
byte[][] outputs, int[] outputOffsets) {
protected void doEncode(ByteBufferEncodingState encodingState) {
// Nothing to do. Output buffers have already been reset
}
}

View File

@ -18,19 +18,21 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/**
* A raw erasure coder factory for dummy raw coders.
*/
@InterfaceAudience.Private
public class DummyRawErasureCoderFactory implements RawErasureCoderFactory {
@Override
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) {
return new DummyRawEncoder(numDataUnits, numParityUnits);
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return new DummyRawEncoder(coderOptions);
}
@Override
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) {
return new DummyRawDecoder(numDataUnits, numParityUnits);
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new DummyRawDecoder(coderOptions);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A utility class that maintains encoding state during an encode call.
*/
@InterfaceAudience.Private
abstract class EncodingState {
RawErasureEncoder encoder;
int encodeLength;
/**
* Check and validate decoding parameters, throw exception accordingly.
* @param inputs input buffers to check
* @param outputs output buffers to check
*/
<T> void checkParameters(T[] inputs, T[] outputs) {
if (inputs.length != encoder.getNumDataUnits()) {
throw new HadoopIllegalArgumentException("Invalid inputs length");
}
if (outputs.length != encoder.getNumParityUnits()) {
throw new HadoopIllegalArgumentException("Invalid outputs length");
}
}
}

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.rawcoder.util.CoderUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.util.DumpUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.util.GF256;
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
@ -34,7 +34,7 @@ import java.util.Arrays;
* from HDFS-RAID, and also compatible with the native/ISA-L coder.
*/
@InterfaceAudience.Private
public class RSRawDecoder extends AbstractRawErasureDecoder {
public class RSRawDecoder extends RawErasureDecoder {
//relevant to schema and won't change during decode calls
private byte[] encodeMatrix;
@ -54,52 +54,54 @@ public class RSRawDecoder extends AbstractRawErasureDecoder {
private int numErasedDataUnits;
private boolean[] erasureFlags;
public RSRawDecoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) {
public RSRawDecoder(ErasureCoderOptions coderOptions) {
super(coderOptions);
int numAllUnits = getNumAllUnits();
if (getNumAllUnits() >= RSUtil.GF.getFieldSize()) {
throw new HadoopIllegalArgumentException(
"Invalid getNumDataUnits() and numParityUnits");
}
int numAllUnits = getNumDataUnits() + numParityUnits;
encodeMatrix = new byte[numAllUnits * getNumDataUnits()];
RSUtil.genCauchyMatrix(encodeMatrix, numAllUnits, getNumDataUnits());
if (isAllowingVerboseDump()) {
DumpUtil.dumpMatrix(encodeMatrix, numDataUnits, numAllUnits);
if (allowVerboseDump()) {
DumpUtil.dumpMatrix(encodeMatrix, getNumDataUnits(), numAllUnits);
}
}
@Override
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
prepareDecoding(inputs, erasedIndexes);
protected void doDecode(ByteBufferDecodingState decodingState) {
CoderUtil.resetOutputBuffers(decodingState.outputs,
decodingState.decodeLength);
prepareDecoding(decodingState.inputs, decodingState.erasedIndexes);
ByteBuffer[] realInputs = new ByteBuffer[getNumDataUnits()];
for (int i = 0; i < getNumDataUnits(); i++) {
realInputs[i] = inputs[validIndexes[i]];
realInputs[i] = decodingState.inputs[validIndexes[i]];
}
RSUtil.encodeData(gfTables, realInputs, outputs);
RSUtil.encodeData(gfTables, realInputs, decodingState.outputs);
}
@Override
protected void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets) {
prepareDecoding(inputs, erasedIndexes);
protected void doDecode(ByteArrayDecodingState decodingState) {
int dataLen = decodingState.decodeLength;
CoderUtil.resetOutputBuffers(decodingState.outputs,
decodingState.outputOffsets, dataLen);
prepareDecoding(decodingState.inputs, decodingState.erasedIndexes);
byte[][] realInputs = new byte[getNumDataUnits()][];
int[] realInputOffsets = new int[getNumDataUnits()];
for (int i = 0; i < getNumDataUnits(); i++) {
realInputs[i] = inputs[validIndexes[i]];
realInputOffsets[i] = inputOffsets[validIndexes[i]];
realInputs[i] = decodingState.inputs[validIndexes[i]];
realInputOffsets[i] = decodingState.inputOffsets[validIndexes[i]];
}
RSUtil.encodeData(gfTables, dataLen, realInputs, realInputOffsets,
outputs, outputOffsets);
decodingState.outputs, decodingState.outputOffsets);
}
private <T> void prepareDecoding(T[] inputs, int[] erasedIndexes) {
int[] tmpValidIndexes = new int[getNumDataUnits()];
CoderUtil.makeValidIndexes(inputs, tmpValidIndexes);
int[] tmpValidIndexes = CoderUtil.getValidIndexes(inputs);
if (Arrays.equals(this.cachedErasedIndexes, erasedIndexes) &&
Arrays.equals(this.validIndexes, tmpValidIndexes)) {
return; // Optimization. Nothing to do
@ -132,7 +134,7 @@ public class RSRawDecoder extends AbstractRawErasureDecoder {
RSUtil.initTables(getNumDataUnits(), erasedIndexes.length,
decodeMatrix, 0, gfTables);
if (isAllowingVerboseDump()) {
if (allowVerboseDump()) {
System.out.println(DumpUtil.bytesToHex(gfTables, -1));
}
}

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.rawcoder.util.CoderUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
import java.nio.ByteBuffer;
@ -34,7 +34,7 @@ import java.nio.ByteBuffer;
* addressed in HADOOP-11871.
*/
@InterfaceAudience.Private
public class RSRawDecoderLegacy extends AbstractRawErasureDecoder {
public class RSRawDecoderLegacy extends RawErasureDecoder {
// To describe and calculate the needed Vandermonde matrix
private int[] errSignature;
private int[] primitivePower;
@ -61,16 +61,16 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder {
private ByteBuffer[] adjustedDirectBufferOutputsParameter =
new ByteBuffer[getNumParityUnits()];
public RSRawDecoderLegacy(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) {
public RSRawDecoderLegacy(ErasureCoderOptions coderOptions) {
super(coderOptions);
if (getNumAllUnits() >= RSUtil.GF.getFieldSize()) {
throw new HadoopIllegalArgumentException(
"Invalid numDataUnits and numParityUnits");
}
this.errSignature = new int[numParityUnits];
this.primitivePower = RSUtil.getPrimitivePower(numDataUnits,
numParityUnits);
this.errSignature = new int[getNumParityUnits()];
this.primitivePower = RSUtil.getPrimitivePower(getNumDataUnits(),
getNumParityUnits());
}
@Override
@ -129,16 +129,18 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder {
}
@Override
protected void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
byte[][] outputs, int[] outputOffsets) {
protected void doDecode(ByteArrayDecodingState decodingState) {
int dataLen = decodingState.decodeLength;
CoderUtil.resetOutputBuffers(decodingState.outputs,
decodingState.outputOffsets, dataLen);
/**
* As passed parameters are friendly to callers but not to the underlying
* implementations, so we have to adjust them before calling doDecodeImpl.
*/
int[] erasedOrNotToReadIndexes =
CoderUtil.getErasedOrNotToReadIndexes(inputs);
CoderUtil.getNullIndexes(decodingState.inputs);
// Prepare for adjustedOutputsParameter
@ -148,16 +150,18 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder {
adjustedOutputOffsets[i] = 0;
}
// Use the caller passed buffers in erasedIndexes positions
for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) {
for (int outputIdx = 0, i = 0;
i < decodingState.erasedIndexes.length; i++) {
boolean found = false;
for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
// If this index is one requested by the caller via erasedIndexes, then
// we use the passed output buffer to avoid copying data thereafter.
if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
if (decodingState.erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
found = true;
adjustedByteArrayOutputsParameter[j] = resetBuffer(
outputs[outputIdx], outputOffsets[outputIdx], dataLen);
adjustedOutputOffsets[j] = outputOffsets[outputIdx];
adjustedByteArrayOutputsParameter[j] = CoderUtil.resetBuffer(
decodingState.outputs[outputIdx],
decodingState.outputOffsets[outputIdx], dataLen);
adjustedOutputOffsets[j] = decodingState.outputOffsets[outputIdx];
outputIdx++;
}
}
@ -169,22 +173,22 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder {
// Use shared buffers for other positions (not set yet)
for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) {
if (adjustedByteArrayOutputsParameter[i] == null) {
adjustedByteArrayOutputsParameter[i] = resetBuffer(
adjustedByteArrayOutputsParameter[i] = CoderUtil.resetBuffer(
checkGetBytesArrayBuffer(bufferIdx, dataLen), 0, dataLen);
adjustedOutputOffsets[i] = 0; // Always 0 for such temp output
bufferIdx++;
}
}
doDecodeImpl(inputs, inputOffsets, dataLen, erasedOrNotToReadIndexes,
doDecodeImpl(decodingState.inputs, decodingState.inputOffsets,
dataLen, erasedOrNotToReadIndexes,
adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
}
@Override
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
int dataLen = validInput.remaining();
protected void doDecode(ByteBufferDecodingState decodingState) {
int dataLen = decodingState.decodeLength;
CoderUtil.resetOutputBuffers(decodingState.outputs, dataLen);
/**
* As passed parameters are friendly to callers but not to the underlying
@ -192,7 +196,7 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder {
*/
int[] erasedOrNotToReadIndexes =
CoderUtil.getErasedOrNotToReadIndexes(inputs);
CoderUtil.getNullIndexes(decodingState.inputs);
// Prepare for adjustedDirectBufferOutputsParameter
@ -201,15 +205,16 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder {
adjustedDirectBufferOutputsParameter[i] = null;
}
// Use the caller passed buffers in erasedIndexes positions
for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) {
for (int outputIdx = 0, i = 0;
i < decodingState.erasedIndexes.length; i++) {
boolean found = false;
for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
// If this index is one requested by the caller via erasedIndexes, then
// we use the passed output buffer to avoid copying data thereafter.
if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
if (decodingState.erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
found = true;
adjustedDirectBufferOutputsParameter[j] =
resetBuffer(outputs[outputIdx++], dataLen);
adjustedDirectBufferOutputsParameter[j] = CoderUtil.resetBuffer(
decodingState.outputs[outputIdx++], dataLen);
}
}
if (!found) {
@ -223,12 +228,13 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder {
ByteBuffer buffer = checkGetDirectBuffer(bufferIdx, dataLen);
buffer.position(0);
buffer.limit(dataLen);
adjustedDirectBufferOutputsParameter[i] = resetBuffer(buffer, dataLen);
adjustedDirectBufferOutputsParameter[i] =
CoderUtil.resetBuffer(buffer, dataLen);
bufferIdx++;
}
}
doDecodeImpl(inputs, erasedOrNotToReadIndexes,
doDecodeImpl(decodingState.inputs, erasedOrNotToReadIndexes,
adjustedDirectBufferOutputsParameter);
}

View File

@ -19,11 +19,10 @@ package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.util.DumpUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
import java.nio.ByteBuffer;
/**
* A raw erasure encoder in RS code scheme in pure Java in case native one
* isn't available in some environment. Please always use native implementations
@ -31,7 +30,7 @@ import java.nio.ByteBuffer;
* from HDFS-RAID, and also compatible with the native/ISA-L coder.
*/
@InterfaceAudience.Private
public class RSRawEncoder extends AbstractRawErasureEncoder {
public class RSRawEncoder extends RawErasureEncoder {
// relevant to schema and won't change during encode calls.
private byte[] encodeMatrix;
/**
@ -40,36 +39,42 @@ public class RSRawEncoder extends AbstractRawErasureEncoder {
*/
private byte[] gfTables;
public RSRawEncoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
public RSRawEncoder(ErasureCoderOptions coderOptions) {
super(coderOptions);
if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) {
if (getNumAllUnits() >= RSUtil.GF.getFieldSize()) {
throw new HadoopIllegalArgumentException(
"Invalid numDataUnits and numParityUnits");
}
encodeMatrix = new byte[getNumAllUnits() * numDataUnits];
RSUtil.genCauchyMatrix(encodeMatrix, getNumAllUnits(), numDataUnits);
if (isAllowingVerboseDump()) {
DumpUtil.dumpMatrix(encodeMatrix, numDataUnits, getNumAllUnits());
encodeMatrix = new byte[getNumAllUnits() * getNumDataUnits()];
RSUtil.genCauchyMatrix(encodeMatrix, getNumAllUnits(), getNumDataUnits());
if (allowVerboseDump()) {
DumpUtil.dumpMatrix(encodeMatrix, getNumDataUnits(), getNumAllUnits());
}
gfTables = new byte[getNumAllUnits() * numDataUnits * 32];
RSUtil.initTables(numDataUnits, numParityUnits, encodeMatrix,
numDataUnits * numDataUnits, gfTables);
if (isAllowingVerboseDump()) {
gfTables = new byte[getNumAllUnits() * getNumDataUnits() * 32];
RSUtil.initTables(getNumDataUnits(), getNumParityUnits(), encodeMatrix,
getNumDataUnits() * getNumDataUnits(), gfTables);
if (allowVerboseDump()) {
System.out.println(DumpUtil.bytesToHex(gfTables, -1));
}
}
@Override
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
RSUtil.encodeData(gfTables, inputs, outputs);
protected void doEncode(ByteBufferEncodingState encodingState) {
CoderUtil.resetOutputBuffers(encodingState.outputs,
encodingState.encodeLength);
RSUtil.encodeData(gfTables, encodingState.inputs, encodingState.outputs);
}
@Override
protected void doEncode(byte[][] inputs, int[] inputOffsets,
int dataLen, byte[][] outputs, int[] outputOffsets) {
RSUtil.encodeData(gfTables, dataLen, inputs, inputOffsets, outputs,
outputOffsets);
protected void doEncode(ByteArrayEncodingState encodingState) {
CoderUtil.resetOutputBuffers(encodingState.outputs,
encodingState.outputOffsets,
encodingState.encodeLength);
RSUtil.encodeData(gfTables, encodingState.encodeLength,
encodingState.inputs,
encodingState.inputOffsets, encodingState.outputs,
encodingState.outputOffsets);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
import java.nio.ByteBuffer;
@ -29,20 +30,20 @@ import java.util.Arrays;
* when possible.
*/
@InterfaceAudience.Private
public class RSRawEncoderLegacy extends AbstractRawErasureEncoder {
public class RSRawEncoderLegacy extends RawErasureEncoder {
private int[] generatingPolynomial;
public RSRawEncoderLegacy(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
public RSRawEncoderLegacy(ErasureCoderOptions coderOptions) {
super(coderOptions);
assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize());
int[] primitivePower = RSUtil.getPrimitivePower(numDataUnits,
numParityUnits);
int[] primitivePower = RSUtil.getPrimitivePower(getNumDataUnits(),
getNumParityUnits());
// compute generating polynomial
int[] gen = {1};
int[] poly = new int[2];
for (int i = 0; i < numParityUnits; i++) {
for (int i = 0; i < getNumParityUnits(); i++) {
poly[0] = primitivePower[i];
poly[1] = 1;
gen = RSUtil.GF.multiply(gen, poly);
@ -52,15 +53,21 @@ public class RSRawEncoderLegacy extends AbstractRawErasureEncoder {
}
@Override
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
protected void doEncode(ByteBufferEncodingState encodingState) {
CoderUtil.resetOutputBuffers(encodingState.outputs,
encodingState.encodeLength);
// parity units + data units
ByteBuffer[] all = new ByteBuffer[outputs.length + inputs.length];
ByteBuffer[] all = new ByteBuffer[encodingState.outputs.length +
encodingState.inputs.length];
if (isAllowingChangeInputs()) {
System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(inputs, 0, all, outputs.length, inputs.length);
if (allowChangeInputs()) {
System.arraycopy(encodingState.outputs, 0, all, 0,
encodingState.outputs.length);
System.arraycopy(encodingState.inputs, 0, all,
encodingState.outputs.length, encodingState.inputs.length);
} else {
System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(encodingState.outputs, 0, all, 0,
encodingState.outputs.length);
/**
* Note when this coder would be really (rarely) used in a production
@ -68,11 +75,11 @@ public class RSRawEncoderLegacy extends AbstractRawErasureEncoder {
* buffers avoiding reallocating.
*/
ByteBuffer tmp;
for (int i = 0; i < inputs.length; i++) {
tmp = ByteBuffer.allocate(inputs[i].remaining());
tmp.put(inputs[i]);
for (int i = 0; i < encodingState.inputs.length; i++) {
tmp = ByteBuffer.allocate(encodingState.inputs[i].remaining());
tmp.put(encodingState.inputs[i]);
tmp.flip();
all[outputs.length + i] = tmp;
all[encodingState.outputs.length + i] = tmp;
}
}
@ -81,27 +88,38 @@ public class RSRawEncoderLegacy extends AbstractRawErasureEncoder {
}
@Override
protected void doEncode(byte[][] inputs, int[] inputOffsets,
int dataLen, byte[][] outputs,
int[] outputOffsets) {
protected void doEncode(ByteArrayEncodingState encodingState) {
int dataLen = encodingState.encodeLength;
CoderUtil.resetOutputBuffers(encodingState.outputs,
encodingState.outputOffsets, dataLen);
// parity units + data units
byte[][] all = new byte[outputs.length + inputs.length][];
int[] allOffsets = new int[outputOffsets.length + inputOffsets.length];
byte[][] all = new byte[encodingState.outputs.length +
encodingState.inputs.length][];
int[] allOffsets = new int[encodingState.outputOffsets.length +
encodingState.inputOffsets.length];
if (isAllowingChangeInputs()) {
System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(inputs, 0, all, outputs.length, inputs.length);
if (allowChangeInputs()) {
System.arraycopy(encodingState.outputs, 0, all, 0,
encodingState.outputs.length);
System.arraycopy(encodingState.inputs, 0, all,
encodingState.outputs.length, encodingState.inputs.length);
System.arraycopy(outputOffsets, 0, allOffsets, 0, outputOffsets.length);
System.arraycopy(inputOffsets, 0, allOffsets,
outputOffsets.length, inputOffsets.length);
System.arraycopy(encodingState.outputOffsets, 0, allOffsets, 0,
encodingState.outputOffsets.length);
System.arraycopy(encodingState.inputOffsets, 0, allOffsets,
encodingState.outputOffsets.length,
encodingState.inputOffsets.length);
} else {
System.arraycopy(outputs, 0, all, 0, outputs.length);
System.arraycopy(outputOffsets, 0, allOffsets, 0, outputOffsets.length);
System.arraycopy(encodingState.outputs, 0, all, 0,
encodingState.outputs.length);
System.arraycopy(encodingState.outputOffsets, 0, allOffsets, 0,
encodingState.outputOffsets.length);
for (int i = 0; i < inputs.length; i++) {
all[outputs.length + i] = Arrays.copyOfRange(inputs[i],
inputOffsets[i], inputOffsets[i] + dataLen);
for (int i = 0; i < encodingState.inputs.length; i++) {
all[encodingState.outputs.length + i] =
Arrays.copyOfRange(encodingState.inputs[i],
encodingState.inputOffsets[i],
encodingState.inputOffsets[i] + dataLen);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/**
* A raw coder factory for the new raw Reed-Solomon coder in Java.
@ -26,12 +27,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
public class RSRawErasureCoderFactory implements RawErasureCoderFactory {
@Override
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) {
return new RSRawEncoder(numDataUnits, numParityUnits);
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return new RSRawEncoder(coderOptions);
}
@Override
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) {
return new RSRawDecoder(numDataUnits, numParityUnits);
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new RSRawDecoder(coderOptions);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/**
* A raw coder factory for the legacy raw Reed-Solomon coder in Java.
@ -26,12 +27,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
public class RSRawErasureCoderFactoryLegacy implements RawErasureCoderFactory {
@Override
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) {
return new RSRawEncoderLegacy(numDataUnits, numParityUnits);
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return new RSRawEncoderLegacy(coderOptions);
}
@Override
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) {
return new RSRawDecoderLegacy(numDataUnits, numParityUnits);
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new RSRawDecoderLegacy(coderOptions);
}
}

View File

@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
/**
* RawErasureCoder is a common interface for {@link RawErasureEncoder} and
* {@link RawErasureDecoder} as both encoder and decoder share some properties.
*
* RawErasureCoder is part of ErasureCodec framework, where ErasureCoder is
* used to encode/decode a group of blocks (BlockGroup) according to the codec
* specific BlockGroup layout and logic. An ErasureCoder extracts chunks of
* data from the blocks and can employ various low level RawErasureCoders to
* perform encoding/decoding against the chunks.
*
* To distinguish from ErasureCoder, here RawErasureCoder is used to mean the
* low level constructs, since it only takes care of the math calculation with
* a group of byte buffers.
*/
@InterfaceAudience.Private
public interface RawErasureCoder extends Configurable {
/**
* Get a coder option value.
* @param option
* @return option value
*/
public Object getCoderOption(CoderOption option);
/**
* Set a coder option value.
* @param option
* @param value
*/
public void setCoderOption(CoderOption option, Object value);
/**
* The number of data input units for the coding. A unit can be a byte,
* chunk or buffer or even a block.
* @return count of data input units
*/
public int getNumDataUnits();
/**
* The number of parity output units for the coding. A unit can be a byte,
* chunk, buffer or even a block.
* @return count of parity output units
*/
public int getNumParityUnits();
/**
* Should be called when release this coder. Good chance to release encoding
* or decoding buffers
*/
public void release();
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/**
* Raw erasure coder factory that can be used to create raw encoder and decoder.
@ -29,17 +30,15 @@ public interface RawErasureCoderFactory {
/**
* Create raw erasure encoder.
* @param numDataUnits number of data units in a coding group
* @param numParityUnits number of parity units in a coding group
* @param conf the configuration used to create the encoder
* @return raw erasure encoder
*/
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits);
RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions);
/**
* Create raw erasure decoder.
* @param numDataUnits number of data units in a coding group
* @param numParityUnits number of parity units in a coding group
* @param conf the configuration used to create the encoder
* @return raw erasure decoder
*/
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits);
RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions);
}

View File

@ -19,18 +19,34 @@ package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.nio.ByteBuffer;
/**
* RawErasureDecoder performs decoding given chunks of input data and generates
* missing data that corresponds to an erasure code scheme, like XOR and
* Reed-Solomon.
* An abstract raw erasure decoder that's to be inherited by new decoders.
*
* It extends the {@link RawErasureCoder} interface.
* Raw erasure coder is part of erasure codec framework, where erasure coder is
* used to encode/decode a group of blocks (BlockGroup) according to the codec
* specific BlockGroup layout and logic. An erasure coder extracts chunks of
* data from the blocks and can employ various low level raw erasure coders to
* perform encoding/decoding against the chunks.
*
* To distinguish from erasure coder, here raw erasure coder is used to mean the
* low level constructs, since it only takes care of the math calculation with
* a group of byte buffers.
*
* Note it mainly provides decode() calls, which should be stateless and may be
* made thread-safe in future.
*/
@InterfaceAudience.Private
public interface RawErasureDecoder extends RawErasureCoder {
public abstract class RawErasureDecoder {
private final ErasureCoderOptions coderOptions;
public RawErasureDecoder(ErasureCoderOptions coderOptions) {
this.coderOptions = coderOptions;
}
/**
* Decode with inputs and erasedIndexes, generates outputs.
@ -64,8 +80,44 @@ public interface RawErasureDecoder extends RawErasureCoder {
* @param outputs output buffers to put decoded data into according to
* erasedIndexes, ready for read after the call
*/
void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs);
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBufferDecodingState decodingState = new ByteBufferDecodingState(this,
inputs, erasedIndexes, outputs);
boolean usingDirectBuffer = decodingState.usingDirectBuffer;
int dataLen = decodingState.decodeLength;
if (dataLen == 0) {
return;
}
int[] inputPositions = new int[inputs.length];
for (int i = 0; i < inputPositions.length; i++) {
if (inputs[i] != null) {
inputPositions[i] = inputs[i].position();
}
}
if (usingDirectBuffer) {
doDecode(decodingState);
} else {
ByteArrayDecodingState badState = decodingState.convertToByteArrayState();
doDecode(badState);
}
for (int i = 0; i < inputs.length; i++) {
if (inputs[i] != null) {
// dataLen bytes consumed
inputs[i].position(inputPositions[i] + dataLen);
}
}
}
/**
* Perform the real decoding using Direct ByteBuffer.
* @param decodingState the decoding state
*/
protected abstract void doDecode(ByteBufferDecodingState decodingState);
/**
* Decode with inputs and erasedIndexes, generates outputs. More see above.
@ -75,7 +127,23 @@ public interface RawErasureDecoder extends RawErasureCoder {
* @param outputs output buffers to put decoded data into according to
* erasedIndexes, ready for read after the call
*/
void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs);
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
ByteArrayDecodingState decodingState = new ByteArrayDecodingState(this,
inputs, erasedIndexes, outputs);
if (decodingState.decodeLength == 0) {
return;
}
doDecode(decodingState);
}
/**
* Perform the real decoding using bytes array, supporting offsets and
* lengths.
* @param decodingState the decoding state
*/
protected abstract void doDecode(ByteArrayDecodingState decodingState);
/**
* Decode with inputs and erasedIndexes, generates outputs. More see above.
@ -88,6 +156,57 @@ public interface RawErasureDecoder extends RawErasureCoder {
* @param outputs output buffers to put decoded data into according to
* erasedIndexes, ready for read after the call
*/
void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs);
public void decode(ECChunk[] inputs, int[] erasedIndexes,
ECChunk[] outputs) {
ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs);
ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs);
decode(newInputs, erasedIndexes, newOutputs);
}
public int getNumDataUnits() {
return coderOptions.getNumDataUnits();
}
public int getNumParityUnits() {
return coderOptions.getNumParityUnits();
}
protected int getNumAllUnits() {
return coderOptions.getNumAllUnits();
}
/**
* Tell if direct buffer is preferred or not. It's for callers to
* decide how to allocate coding chunk buffers, using DirectByteBuffer or
* bytes array. It will return false by default.
* @return true if native buffer is preferred for performance consideration,
* otherwise false.
*/
public boolean preferDirectBuffer() {
return false;
}
/**
* Allow change into input buffers or not while perform encoding/decoding.
* @return true if it's allowed to change inputs, false otherwise
*/
public boolean allowChangeInputs() {
return coderOptions.allowChangeInputs();
}
/**
* Allow to dump verbose info during encoding/decoding.
* @return true if it's allowed to do verbose dump, false otherwise.
*/
public boolean allowVerboseDump() {
return coderOptions.allowVerboseDump();
}
/**
* Should be called when release this coder. Good chance to release encoding
* or decoding buffers
*/
public void release() {
// Nothing to do here.
}
}

View File

@ -19,18 +19,34 @@ package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.nio.ByteBuffer;
/**
* RawErasureEncoder performs encoding given chunks of input data and generates
* parity outputs that corresponds to an erasure code scheme, like XOR and
* Reed-Solomon.
* An abstract raw erasure encoder that's to be inherited by new encoders.
*
* It extends the {@link RawErasureCoder} interface.
* Raw erasure coder is part of erasure codec framework, where erasure coder is
* used to encode/decode a group of blocks (BlockGroup) according to the codec
* specific BlockGroup layout and logic. An erasure coder extracts chunks of
* data from the blocks and can employ various low level raw erasure coders to
* perform encoding/decoding against the chunks.
*
* To distinguish from erasure coder, here raw erasure coder is used to mean the
* low level constructs, since it only takes care of the math calculation with
* a group of byte buffers.
*
* Note it mainly provides encode() calls, which should be stateless and may be
* made thread-safe in future.
*/
@InterfaceAudience.Private
public interface RawErasureEncoder extends RawErasureCoder {
public abstract class RawErasureEncoder {
private final ErasureCoderOptions coderOptions;
public RawErasureEncoder(ErasureCoderOptions coderOptions) {
this.coderOptions = coderOptions;
}
/**
* Encode with inputs and generates outputs.
@ -47,7 +63,43 @@ public interface RawErasureEncoder extends RawErasureCoder {
* @param outputs output buffers to put the encoded data into, ready to read
* after the call
*/
void encode(ByteBuffer[] inputs, ByteBuffer[] outputs);
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
ByteBufferEncodingState bbeState = new ByteBufferEncodingState(
this, inputs, outputs);
boolean usingDirectBuffer = bbeState.usingDirectBuffer;
int dataLen = bbeState.encodeLength;
if (dataLen == 0) {
return;
}
int[] inputPositions = new int[inputs.length];
for (int i = 0; i < inputPositions.length; i++) {
if (inputs[i] != null) {
inputPositions[i] = inputs[i].position();
}
}
if (usingDirectBuffer) {
doEncode(bbeState);
} else {
ByteArrayEncodingState baeState = bbeState.convertToByteArrayState();
doEncode(baeState);
}
for (int i = 0; i < inputs.length; i++) {
if (inputs[i] != null) {
// dataLen bytes consumed
inputs[i].position(inputPositions[i] + dataLen);
}
}
}
/**
* Perform the real encoding work using direct ByteBuffer.
* @param encodingState the encoding state
*/
protected abstract void doEncode(ByteBufferEncodingState encodingState);
/**
* Encode with inputs and generates outputs. More see above.
@ -56,7 +108,24 @@ public interface RawErasureEncoder extends RawErasureCoder {
* @param outputs output buffers to put the encoded data into, read to read
* after the call
*/
void encode(byte[][] inputs, byte[][] outputs);
public void encode(byte[][] inputs, byte[][] outputs) {
ByteArrayEncodingState baeState = new ByteArrayEncodingState(
this, inputs, outputs);
int dataLen = baeState.encodeLength;
if (dataLen == 0) {
return;
}
doEncode(baeState);
}
/**
* Perform the real encoding work using bytes array, supporting offsets
* and lengths.
* @param encodingState the encoding state
*/
protected abstract void doEncode(ByteArrayEncodingState encodingState);
/**
* Encode with inputs and generates outputs. More see above.
@ -65,6 +134,56 @@ public interface RawErasureEncoder extends RawErasureCoder {
* @param outputs output buffers to put the encoded data into, read to read
* after the call
*/
void encode(ECChunk[] inputs, ECChunk[] outputs);
public void encode(ECChunk[] inputs, ECChunk[] outputs) {
ByteBuffer[] newInputs = ECChunk.toBuffers(inputs);
ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs);
encode(newInputs, newOutputs);
}
public int getNumDataUnits() {
return coderOptions.getNumDataUnits();
}
public int getNumParityUnits() {
return coderOptions.getNumParityUnits();
}
public int getNumAllUnits() {
return coderOptions.getNumAllUnits();
}
/**
* Tell if direct buffer is preferred or not. It's for callers to
* decide how to allocate coding chunk buffers, using DirectByteBuffer or
* bytes array. It will return false by default.
* @return true if native buffer is preferred for performance consideration,
* otherwise false.
*/
public boolean preferDirectBuffer() {
return false;
}
/**
* Allow change into input buffers or not while perform encoding/decoding.
* @return true if it's allowed to change inputs, false otherwise
*/
public boolean allowChangeInputs() {
return coderOptions.allowChangeInputs();
}
/**
* Allow to dump verbose info during encoding/decoding.
* @return true if it's allowed to do verbose dump, false otherwise.
*/
public boolean allowVerboseDump() {
return coderOptions.allowVerboseDump();
}
/**
* Should be called when release this coder. Good chance to release encoding
* or decoding buffers
*/
public void release() {
// Nothing to do here.
}
}

View File

@ -17,9 +17,10 @@
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.nio.ByteBuffer;
/**
* A raw decoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
@ -29,55 +30,57 @@ import org.apache.hadoop.classification.InterfaceAudience;
* deployed independently.
*/
@InterfaceAudience.Private
public class XORRawDecoder extends AbstractRawErasureDecoder {
public class XORRawDecoder extends RawErasureDecoder {
public XORRawDecoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
public XORRawDecoder(ErasureCoderOptions coderOptions) {
super(coderOptions);
}
@Override
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer output = outputs[0];
protected void doDecode(ByteBufferDecodingState decodingState) {
CoderUtil.resetOutputBuffers(decodingState.outputs,
decodingState.decodeLength);
ByteBuffer output = decodingState.outputs[0];
int erasedIdx = erasedIndexes[0];
int erasedIdx = decodingState.erasedIndexes[0];
// Process the inputs.
int iIdx, oIdx;
for (int i = 0; i < inputs.length; i++) {
for (int i = 0; i < decodingState.inputs.length; i++) {
// Skip the erased location.
if (i == erasedIdx) {
continue;
}
for (iIdx = inputs[i].position(), oIdx = output.position();
iIdx < inputs[i].limit();
for (iIdx = decodingState.inputs[i].position(), oIdx = output.position();
iIdx < decodingState.inputs[i].limit();
iIdx++, oIdx++) {
output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx)));
output.put(oIdx, (byte) (output.get(oIdx) ^
decodingState.inputs[i].get(iIdx)));
}
}
}
@Override
protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen,
int[] erasedIndexes, byte[][] outputs,
int[] outputOffsets) {
byte[] output = outputs[0];
resetBuffer(output, outputOffsets[0], dataLen);
int erasedIdx = erasedIndexes[0];
protected void doDecode(ByteArrayDecodingState decodingState) {
byte[] output = decodingState.outputs[0];
int dataLen = decodingState.decodeLength;
CoderUtil.resetOutputBuffers(decodingState.outputs,
decodingState.outputOffsets, dataLen);
int erasedIdx = decodingState.erasedIndexes[0];
// Process the inputs.
int iIdx, oIdx;
for (int i = 0; i < inputs.length; i++) {
for (int i = 0; i < decodingState.inputs.length; i++) {
// Skip the erased location.
if (i == erasedIdx) {
continue;
}
for (iIdx = inputOffsets[i], oIdx = outputOffsets[0];
iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) {
output[oIdx] ^= inputs[i][iIdx];
for (iIdx = decodingState.inputOffsets[i],
oIdx = decodingState.outputOffsets[0];
iIdx < decodingState.inputOffsets[i] + dataLen; iIdx++, oIdx++) {
output[oIdx] ^= decodingState.inputs[i][iIdx];
}
}
}

View File

@ -17,9 +17,10 @@
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import java.nio.ByteBuffer;
/**
* A raw encoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
@ -29,50 +30,56 @@ import org.apache.hadoop.classification.InterfaceAudience;
* deployed independently.
*/
@InterfaceAudience.Private
public class XORRawEncoder extends AbstractRawErasureEncoder {
public class XORRawEncoder extends RawErasureEncoder {
public XORRawEncoder(int numDataUnits, int numParityUnits) {
super(numDataUnits, numParityUnits);
public XORRawEncoder(ErasureCoderOptions coderOptions) {
super(coderOptions);
}
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
ByteBuffer output = outputs[0];
protected void doEncode(ByteBufferEncodingState encodingState) {
CoderUtil.resetOutputBuffers(encodingState.outputs,
encodingState.encodeLength);
ByteBuffer output = encodingState.outputs[0];
// Get the first buffer's data.
int iIdx, oIdx;
for (iIdx = inputs[0].position(), oIdx = output.position();
iIdx < inputs[0].limit(); iIdx++, oIdx++) {
output.put(oIdx, inputs[0].get(iIdx));
for (iIdx = encodingState.inputs[0].position(), oIdx = output.position();
iIdx < encodingState.inputs[0].limit(); iIdx++, oIdx++) {
output.put(oIdx, encodingState.inputs[0].get(iIdx));
}
// XOR with everything else.
for (int i = 1; i < inputs.length; i++) {
for (iIdx = inputs[i].position(), oIdx = output.position();
iIdx < inputs[i].limit();
for (int i = 1; i < encodingState.inputs.length; i++) {
for (iIdx = encodingState.inputs[i].position(), oIdx = output.position();
iIdx < encodingState.inputs[i].limit();
iIdx++, oIdx++) {
output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx)));
output.put(oIdx, (byte) (output.get(oIdx) ^
encodingState.inputs[i].get(iIdx)));
}
}
}
@Override
protected void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen,
byte[][] outputs, int[] outputOffsets) {
byte[] output = outputs[0];
resetBuffer(output, outputOffsets[0], dataLen);
protected void doEncode(ByteArrayEncodingState encodingState) {
int dataLen = encodingState.encodeLength;
CoderUtil.resetOutputBuffers(encodingState.outputs,
encodingState.outputOffsets, dataLen);
byte[] output = encodingState.outputs[0];
// Get the first buffer's data.
int iIdx, oIdx;
for (iIdx = inputOffsets[0], oIdx = outputOffsets[0];
iIdx < inputOffsets[0] + dataLen; iIdx++, oIdx++) {
output[oIdx] = inputs[0][iIdx];
for (iIdx = encodingState.inputOffsets[0],
oIdx = encodingState.outputOffsets[0];
iIdx < encodingState.inputOffsets[0] + dataLen; iIdx++, oIdx++) {
output[oIdx] = encodingState.inputs[0][iIdx];
}
// XOR with everything else.
for (int i = 1; i < inputs.length; i++) {
for (iIdx = inputOffsets[i], oIdx = outputOffsets[0];
iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) {
output[oIdx] ^= inputs[i][iIdx];
for (int i = 1; i < encodingState.inputs.length; i++) {
for (iIdx = encodingState.inputOffsets[i],
oIdx = encodingState.outputOffsets[0];
iIdx < encodingState.inputOffsets[i] + dataLen; iIdx++, oIdx++) {
output[oIdx] ^= encodingState.inputs[i][iIdx];
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/**
* A raw coder factory for raw XOR coder.
@ -26,12 +27,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
public class XORRawErasureCoderFactory implements RawErasureCoderFactory {
@Override
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) {
return new XORRawEncoder(numDataUnits, numParityUnits);
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return new XORRawEncoder(coderOptions);
}
@Override
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) {
return new XORRawDecoder(numDataUnits, numParityUnits);
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new XORRawDecoder(coderOptions);
}
}

View File

@ -15,29 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
/**
* Supported erasure coder options.
*
* Raw erasure coders.
*
* Raw erasure coder is part of erasure codec framework, where erasure coder is
* used to encode/decode a group of blocks (BlockGroup) according to the codec
* specific BlockGroup layout and logic. An erasure coder extracts chunks of
* data from the blocks and can employ various low level raw erasure coders to
* perform encoding/decoding against the chunks.
*
* To distinguish from erasure coder, here raw erasure coder is used to mean the
* low level constructs, since it only takes care of the math calculation with
* a group of byte buffers.
*/
public enum CoderOption {
/* If direct buffer is preferred, for perf consideration */
PREFER_DIRECT_BUFFER(true), // READ-ONLY
/**
* Allow changing input buffer content (not positions).
* Maybe better perf if allowed
*/
ALLOW_CHANGE_INPUTS(false), // READ-WRITE
/* Allow dump verbose debug info or not */
ALLOW_VERBOSE_DUMP(false); // READ-WRITE
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.io.erasurecode.rawcoder;
private boolean isReadOnly = false;
CoderOption(boolean isReadOnly) {
this.isReadOnly = isReadOnly;
}
public boolean isReadOnly() {
return isReadOnly;
}
};
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -1,83 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder.util;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.Arrays;
/**
* Helpful utilities for implementing some raw erasure coders.
*/
@InterfaceAudience.Private
public final class CoderUtil {
private CoderUtil() {
// No called
}
/**
* Get indexes into inputs array for items marked as null, either erased or
* not to read.
* @return indexes into inputs array
*/
public static <T> int[] getErasedOrNotToReadIndexes(T[] inputs) {
int[] invalidIndexes = new int[inputs.length];
int idx = 0;
for (int i = 0; i < inputs.length; i++) {
if (inputs[i] == null) {
invalidIndexes[idx++] = i;
}
}
return Arrays.copyOf(invalidIndexes, idx);
}
/**
* Find the valid input from all the inputs.
* @param inputs input buffers to look for valid input
* @return the first valid input
*/
public static <T> T findFirstValidInput(T[] inputs) {
for (T input : inputs) {
if (input != null) {
return input;
}
}
throw new HadoopIllegalArgumentException(
"Invalid inputs are found, all being null");
}
/**
* Picking up indexes of valid inputs.
* @param inputs actually decoding input buffers
* @param validIndexes an array to be filled and returned
* @param <T>
*/
public static <T> void makeValidIndexes(T[] inputs, int[] validIndexes) {
int idx = 0;
for (int i = 0; i < inputs.length && idx < validIndexes.length; i++) {
if (inputs[i] != null) {
validIndexes[idx++] = i;
}
}
}
}

View File

@ -17,12 +17,12 @@
*/
package org.apache.hadoop.io.erasurecode.rawcoder.util;
import org.apache.hadoop.classification.InterfaceAudience;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Implementation of Galois field arithmetic with 2^p elements. The input must
* be unsigned integers. It's ported from HDFS-RAID, slightly adapted.

View File

@ -46,37 +46,42 @@ public class TestCodecRawCoderMapping {
@Test
public void testRSDefaultRawCoder() {
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
numDataUnit, numParityUnit);
// should return default raw coder of rs-default codec
RawErasureEncoder encoder = CodecUtil.createRSRawEncoder(
conf, numDataUnit, numParityUnit);
RawErasureEncoder encoder = CodecUtil.createRawEncoder(
conf, ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
Assert.assertTrue(encoder instanceof RSRawEncoder);
RawErasureDecoder decoder = CodecUtil.createRSRawDecoder(
conf, numDataUnit, numParityUnit);
RawErasureDecoder decoder = CodecUtil.createRawDecoder(
conf, ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
Assert.assertTrue(decoder instanceof RSRawDecoder);
// should return default raw coder of rs-legacy codec
encoder = CodecUtil.createRSRawEncoder(conf, numDataUnit, numParityUnit,
ErasureCodeConstants.RS_LEGACY_CODEC_NAME);
encoder = CodecUtil.createRawEncoder(conf,
ErasureCodeConstants.RS_LEGACY_CODEC_NAME, coderOptions);
Assert.assertTrue(encoder instanceof RSRawEncoderLegacy);
decoder = CodecUtil.createRSRawDecoder(conf, numDataUnit, numParityUnit,
ErasureCodeConstants.RS_LEGACY_CODEC_NAME);
decoder = CodecUtil.createRawDecoder(conf,
ErasureCodeConstants.RS_LEGACY_CODEC_NAME, coderOptions);
Assert.assertTrue(decoder instanceof RSRawDecoderLegacy);
}
@Test
public void testDedicatedRawCoderKey() {
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
numDataUnit, numParityUnit);
String dummyFactName = "DummyNoneExistingFactory";
// set the dummy factory to rs-legacy and create a raw coder
// with rs-default, which is OK as the raw coder key is not used
conf.set(CommonConfigurationKeys.
IO_ERASURECODE_CODEC_RS_LEGACY_RAWCODER_KEY, dummyFactName);
RawErasureEncoder encoder = CodecUtil.createRSRawEncoder(conf, numDataUnit,
numParityUnit, ErasureCodeConstants.RS_DEFAULT_CODEC_NAME);
RawErasureEncoder encoder = CodecUtil.createRawEncoder(conf,
ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
Assert.assertTrue(encoder instanceof RSRawEncoder);
// now create the raw coder with rs-legacy, which should throw exception
try {
CodecUtil.createRSRawEncoder(conf, numDataUnit, numParityUnit,
ErasureCodeConstants.RS_LEGACY_CODEC_NAME);
CodecUtil.createRawEncoder(conf,
ErasureCodeConstants.RS_LEGACY_CODEC_NAME, coderOptions);
Assert.fail();
} catch (Exception e) {
GenericTestUtils.assertExceptionContains("Failed to create raw coder", e);

View File

@ -35,7 +35,7 @@ import static org.junit.Assert.assertTrue;
public abstract class TestCoderBase {
protected static Random RAND = new Random();
private boolean allowDump = true;
protected boolean allowDump = true;
private Configuration conf;
protected int numDataUnits;
@ -90,13 +90,8 @@ public abstract class TestCoderBase {
}
}
/**
* Set true during setup if want to dump test settings and coding data,
* useful in debugging.
* @param allowDump
*/
protected void setAllowDump(boolean allowDump) {
this.allowDump = allowDump;
protected boolean isAllowDump() {
return allowDump;
}
/**
@ -502,7 +497,8 @@ public abstract class TestCoderBase {
sb.append(" erasedParityIndexes=").
append(Arrays.toString(erasedParityIndexes));
sb.append(" usingDirectBuffer=").append(usingDirectBuffer);
sb.append(" isAllowingChangeInputs=").append(allowChangeInputs);
sb.append(" allowChangeInputs=").append(allowChangeInputs);
sb.append(" allowVerboseDump=").append(allowDump);
sb.append("\n");
System.out.println(sb.toString());

View File

@ -50,7 +50,7 @@ public class TestDummyRawCoder extends TestRawCoderBase {
@Override
protected void testCoding(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
prepareCoders(true);
prepareBufferAllocator(true);
setAllowChangeInputs(false);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.TestCoderBase;
import org.junit.Assert;
import org.junit.Test;
@ -62,7 +63,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
*/
protected void testCoding(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
prepareCoders(true);
/**
* The following runs will use 3 different chunkSize for inputs and outputs,
@ -79,7 +80,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
*/
protected void testCodingWithBadInput(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
prepareCoders(true);
try {
performTestCoding(baseChunkSize, false, true, false, true);
@ -95,7 +96,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
*/
protected void testCodingWithBadOutput(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
prepareCoders(true);
try {
performTestCoding(baseChunkSize, false, false, true, true);
@ -189,16 +190,23 @@ public abstract class TestRawCoderBase extends TestCoderBase {
protected void setAllowChangeInputs(boolean allowChangeInputs) {
this.allowChangeInputs = allowChangeInputs;
encoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, allowChangeInputs);
decoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, allowChangeInputs);
}
protected void prepareCoders() {
if (encoder == null) {
/**
* Set true during setup if want to dump test settings and coding data,
* useful in debugging.
* @param allowDump
*/
protected void setAllowDump(boolean allowDump) {
this.allowDump = allowDump;
}
protected void prepareCoders(boolean recreate) {
if (encoder == null || recreate) {
encoder = createEncoder();
}
if (decoder == null) {
if (decoder == null || recreate) {
decoder = createDecoder();
}
}
@ -222,18 +230,16 @@ public abstract class TestRawCoderBase extends TestCoderBase {
* @return
*/
protected RawErasureEncoder createEncoder() {
RawErasureEncoder encoder;
ErasureCoderOptions coderConf =
new ErasureCoderOptions(numDataUnits, numParityUnits,
allowChangeInputs, allowDump);
try {
Constructor<? extends RawErasureEncoder> constructor =
(Constructor<? extends RawErasureEncoder>)
encoderClass.getConstructor(int.class, int.class);
encoder = constructor.newInstance(numDataUnits, numParityUnits);
encoderClass.getConstructor(ErasureCoderOptions.class);
return constructor.newInstance(coderConf);
} catch (Exception e) {
throw new RuntimeException("Failed to create encoder", e);
}
encoder.setConf(getConf());
return encoder;
}
/**
@ -241,18 +247,16 @@ public abstract class TestRawCoderBase extends TestCoderBase {
* @return
*/
protected RawErasureDecoder createDecoder() {
RawErasureDecoder decoder;
ErasureCoderOptions coderConf =
new ErasureCoderOptions(numDataUnits, numParityUnits,
allowChangeInputs, allowDump);
try {
Constructor<? extends RawErasureDecoder> constructor =
(Constructor<? extends RawErasureDecoder>)
decoderClass.getConstructor(int.class, int.class);
decoder = constructor.newInstance(numDataUnits, numParityUnits);
decoderClass.getConstructor(ErasureCoderOptions.class);
return constructor.newInstance(coderConf);
} catch (Exception e) {
throw new RuntimeException("Failed to create decoder", e);
}
decoder.setConf(getConf());
return decoder;
}
/**
@ -261,7 +265,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
*/
protected void testInputPosition(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
prepareCoders(true);
prepareBufferAllocator(false);
// verify encode

View File

@ -38,6 +38,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResu
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.util.DirectBufferPool;
@ -184,8 +185,10 @@ public class DFSStripedInputStream extends DFSInputStream {
curStripeRange = new StripeRange(0, 0);
readingService =
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(),
dataBlkNum, parityBlkNum, ecPolicy.getCodecName());
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
dataBlkNum, parityBlkNum);
decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(),
ecPolicy.getCodecName(), coderOptions);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
}

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
@ -286,8 +287,10 @@ public class DFSStripedOutputStream extends DFSOutputStream {
flushAllExecutorCompletionService = new
ExecutorCompletionService<>(flushAllExecutor);
encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
numDataBlocks, numParityBlocks, ecPolicy.getCodecName());
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
numDataBlocks, numParityBlocks);
encoder = CodecUtil.createRawEncoder(dfsClient.getConfiguration(),
ecPolicy.getCodecName(), coderOptions);
coordinator = new Coordinator(numAllBlocks);
try {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum;
@ -215,8 +216,10 @@ class StripedReconstructor implements Runnable {
// Initialize decoder
private void initDecoderIfNecessary() {
if (decoder == null) {
decoder = CodecUtil.createRSRawDecoder(conf, ecPolicy.getNumDataUnits(),
ecPolicy.getNumParityUnits(), ecPolicy.getCodecName());
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(),
coderOptions);
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.junit.Assert;
@ -491,9 +492,12 @@ public class StripedFileTestUtil {
System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
}
}
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
dataBytes.length, parityBytes.length);
final RawErasureEncoder encoder =
CodecUtil.createRSRawEncoder(conf, dataBytes.length, parityBytes.length,
TEST_EC_POLICY.getCodecName());
CodecUtil.createRawEncoder(conf, TEST_EC_POLICY.getCodecName(),
coderOptions);
encoder.encode(dataBytes, expectedParityBytes);
for (int i = 0; i < parityBytes.length; i++) {
if (checkSet.contains(i + dataBytes.length)){

View File

@ -20,30 +20,25 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
@ -51,6 +46,12 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestDFSStripedInputStream {
public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class);
@ -217,8 +218,10 @@ public class TestDFSStripedInputStream {
}
}
RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf,
DATA_BLK_NUM, PARITY_BLK_NUM, ecPolicy.getCodecName());
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
DATA_BLK_NUM, PARITY_BLK_NUM);
RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(conf,
ecPolicy.getCodecName(), coderOptions);
// Update the expected content for decoded data
int[] missingBlkIdx = new int[PARITY_BLK_NUM];