From 77202fa1035a54496d11d07472fbc399148ff630 Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Fri, 27 May 2016 13:23:34 +0800 Subject: [PATCH] HADOOP-13010. Refactor raw erasure coders. Contributed by Kai Zheng --- .../hadoop/io/erasurecode/CodecUtil.java | 119 +++------- .../io/erasurecode/ErasureCoderOptions.java | 89 +++++++ .../coder/HHXORErasureDecoder.java | 18 +- .../coder/HHXORErasureEncoder.java | 15 +- .../erasurecode/coder/RSErasureDecoder.java | 6 +- .../erasurecode/coder/RSErasureEncoder.java | 6 +- .../erasurecode/coder/XORErasureDecoder.java | 6 +- .../erasurecode/coder/XORErasureEncoder.java | 6 +- .../rawcoder/AbstractRawErasureCoder.java | 220 ------------------ .../rawcoder/AbstractRawErasureDecoder.java | 181 -------------- .../rawcoder/AbstractRawErasureEncoder.java | 146 ------------ .../rawcoder/ByteArrayDecodingState.java | 111 +++++++++ .../rawcoder/ByteArrayEncodingState.java | 81 +++++++ .../rawcoder/ByteBufferDecodingState.java | 134 +++++++++++ .../rawcoder/ByteBufferEncodingState.java | 98 ++++++++ .../io/erasurecode/rawcoder/CoderUtil.java | 199 ++++++++++++++++ .../erasurecode/rawcoder/DecodingState.java | 55 +++++ .../erasurecode/rawcoder/DummyRawDecoder.java | 16 +- .../erasurecode/rawcoder/DummyRawEncoder.java | 15 +- .../rawcoder/DummyRawErasureCoderFactory.java | 10 +- .../erasurecode/rawcoder/EncodingState.java | 44 ++++ .../io/erasurecode/rawcoder/RSRawDecoder.java | 48 ++-- .../rawcoder/RSRawDecoderLegacy.java | 66 +++--- .../io/erasurecode/rawcoder/RSRawEncoder.java | 45 ++-- .../rawcoder/RSRawEncoderLegacy.java | 82 ++++--- .../rawcoder/RSRawErasureCoderFactory.java | 9 +- .../RSRawErasureCoderFactoryLegacy.java | 9 +- .../erasurecode/rawcoder/RawErasureCoder.java | 73 ------ .../rawcoder/RawErasureCoderFactory.java | 11 +- .../rawcoder/RawErasureDecoder.java | 137 ++++++++++- .../rawcoder/RawErasureEncoder.java | 135 ++++++++++- .../erasurecode/rawcoder/XORRawDecoder.java | 51 ++-- .../erasurecode/rawcoder/XORRawEncoder.java | 57 +++-- .../rawcoder/XORRawErasureCoderFactory.java | 9 +- .../{CoderOption.java => package-info.java} | 39 ++-- .../erasurecode/rawcoder/util/CoderUtil.java | 83 ------- .../rawcoder/util/GaloisField.java | 4 +- .../erasurecode/TestCodecRawCoderMapping.java | 29 ++- .../hadoop/io/erasurecode/TestCoderBase.java | 14 +- .../rawcoder/TestDummyRawCoder.java | 2 +- .../rawcoder/TestRawCoderBase.java | 50 ++-- .../hadoop/hdfs/DFSStripedInputStream.java | 7 +- .../hadoop/hdfs/DFSStripedOutputStream.java | 7 +- .../erasurecode/StripedReconstructor.java | 7 +- .../hadoop/hdfs/StripedFileTestUtil.java | 8 +- .../hdfs/TestDFSStripedInputStream.java | 23 +- 46 files changed, 1478 insertions(+), 1102 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoder.java rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/{CoderOption.java => package-info.java} (53%) delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/CoderUtil.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java index fcce071b50..9cd9561c8f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; @@ -36,115 +35,61 @@ public final class CodecUtil { /** * Create RS raw encoder according to configuration. - * @param conf configuration possibly with some items to configure the coder - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group + * @param conf configuration + * @param coderOptions coder options that's used to create the coder * @param codec the codec to use. If null, will use the default codec * @return raw encoder */ - public static RawErasureEncoder createRSRawEncoder( - Configuration conf, int numDataUnits, int numParityUnits, String codec) { + public static RawErasureEncoder createRawEncoder( + Configuration conf, String codec, ErasureCoderOptions coderOptions) { Preconditions.checkNotNull(conf); - if (codec == null) { - codec = ErasureCodeConstants.RS_DEFAULT_CODEC_NAME; - } - RawErasureCoder rawCoder = createRawCoder(conf, - getFactNameFromCodec(conf, codec), true, numDataUnits, numParityUnits); - return (RawErasureEncoder) rawCoder; - } + Preconditions.checkNotNull(codec); - /** - * Create RS raw encoder using the default codec. - */ - public static RawErasureEncoder createRSRawEncoder( - Configuration conf, int numDataUnits, int numParityUnits) { - return createRSRawEncoder(conf, numDataUnits, numParityUnits, null); + String rawCoderFactoryKey = getFactNameFromCodec(conf, codec); + + RawErasureCoderFactory fact = createRawCoderFactory(conf, + rawCoderFactoryKey); + + return fact.createEncoder(coderOptions); } /** * Create RS raw decoder according to configuration. - * @param conf configuration possibly with some items to configure the coder - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group + * @param conf configuration + * @param coderOptions coder options that's used to create the coder * @param codec the codec to use. If null, will use the default codec * @return raw decoder */ - public static RawErasureDecoder createRSRawDecoder( - Configuration conf, int numDataUnits, int numParityUnits, String codec) { + public static RawErasureDecoder createRawDecoder( + Configuration conf, String codec, ErasureCoderOptions coderOptions) { Preconditions.checkNotNull(conf); - if (codec == null) { - codec = ErasureCodeConstants.RS_DEFAULT_CODEC_NAME; - } - RawErasureCoder rawCoder = createRawCoder(conf, - getFactNameFromCodec(conf, codec), false, numDataUnits, numParityUnits); - return (RawErasureDecoder) rawCoder; + Preconditions.checkNotNull(codec); + + String rawCoderFactoryKey = getFactNameFromCodec(conf, codec); + + RawErasureCoderFactory fact = createRawCoderFactory(conf, + rawCoderFactoryKey); + + return fact.createDecoder(coderOptions); } - /** - * Create RS raw decoder using the default codec. - */ - public static RawErasureDecoder createRSRawDecoder( - Configuration conf, int numDataUnits, int numParityUnits) { - return createRSRawDecoder(conf, numDataUnits, numParityUnits, null); - } - - /** - * Create XOR raw encoder according to configuration. - * @param conf configuration possibly with some items to configure the coder - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group - * @return raw encoder - */ - public static RawErasureEncoder createXORRawEncoder( - Configuration conf, int numDataUnits, int numParityUnits) { - Preconditions.checkNotNull(conf); - RawErasureCoder rawCoder = createRawCoder(conf, - getFactNameFromCodec(conf, ErasureCodeConstants.XOR_CODEC_NAME), - true, numDataUnits, numParityUnits); - return (RawErasureEncoder) rawCoder; - } - - /** - * Create XOR raw decoder according to configuration. - * @param conf configuration possibly with some items to configure the coder - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group - * @return raw decoder - */ - public static RawErasureDecoder createXORRawDecoder( - Configuration conf, int numDataUnits, int numParityUnits) { - Preconditions.checkNotNull(conf); - RawErasureCoder rawCoder = createRawCoder(conf, - getFactNameFromCodec(conf, ErasureCodeConstants.XOR_CODEC_NAME), - false, numDataUnits, numParityUnits); - return (RawErasureDecoder) rawCoder; - } - - /** - * Create raw coder using specified conf and raw coder factory key. - * @param conf configuration possibly with some items to configure the coder - * @param rawCoderFactory name of the raw coder factory - * @param isEncoder is encoder or not we're going to create - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group - * @return raw coder - */ - public static RawErasureCoder createRawCoder(Configuration conf, - String rawCoderFactory, boolean isEncoder, int numDataUnits, - int numParityUnits) { - + private static RawErasureCoderFactory createRawCoderFactory( + Configuration conf, String rawCoderFactoryKey) { RawErasureCoderFactory fact; try { Class factClass = conf.getClassByName( - rawCoderFactory).asSubclass(RawErasureCoderFactory.class); + rawCoderFactoryKey).asSubclass(RawErasureCoderFactory.class); fact = factClass.newInstance(); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new RuntimeException("Failed to create raw coder", e); + throw new RuntimeException("Failed to create raw coder factory", e); } - return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) : - fact.createDecoder(numDataUnits, numParityUnits); + if (fact == null) { + throw new RuntimeException("Failed to create raw coder factory"); + } + + return fact; } private static String getFactNameFromCodec(Configuration conf, String codec) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java new file mode 100644 index 0000000000..106a36c9b0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Erasure coder configuration that maintains schema info and coder options. + */ +@InterfaceAudience.Private +public final class ErasureCoderOptions { + + private final int numDataUnits; + private final int numParityUnits; + private final int numAllUnits; + private final boolean allowChangeInputs; + private final boolean allowVerboseDump; + + public ErasureCoderOptions(int numDataUnits, int numParityUnits) { + this(numDataUnits, numParityUnits, false, false); + } + + public ErasureCoderOptions(int numDataUnits, int numParityUnits, + boolean allowChangeInputs, boolean allowVerboseDump) { + this.numDataUnits = numDataUnits; + this.numParityUnits = numParityUnits; + this.numAllUnits = numDataUnits + numParityUnits; + this.allowChangeInputs = allowChangeInputs; + this.allowVerboseDump = allowVerboseDump; + } + + /** + * The number of data input units for the coding. A unit can be a byte, + * chunk or buffer or even a block. + * @return count of data input units + */ + public int getNumDataUnits() { + return numDataUnits; + } + + /** + * The number of parity output units for the coding. A unit can be a byte, + * chunk, buffer or even a block. + * @return count of parity output units + */ + public int getNumParityUnits() { + return numParityUnits; + } + + /** + * The number of all the involved units in the coding. + * @return count of all the data units and parity units + */ + public int getNumAllUnits() { + return numAllUnits; + } + + /** + * Allow changing input buffer content (not positions). Maybe better + * performance if not allowed. + * @return true if allowing input content to be changed, false otherwise + */ + public boolean allowChangeInputs() { + return allowChangeInputs; + } + + /** + * Allow dump verbose debug info or not. + * @return true if verbose debug info is desired, false otherwise + */ + public boolean allowVerboseDump() { + return allowVerboseDump; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java index ac4df16376..94487d82e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java @@ -22,7 +22,10 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.*; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** * Hitchhiker is a new erasure coding algorithm developed as a research project @@ -68,17 +71,20 @@ public class HHXORErasureDecoder extends AbstractErasureDecoder { private RawErasureDecoder checkCreateRSRawDecoder() { if (rsRawDecoder == null) { - rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(), - getNumDataUnits(), getNumParityUnits()); + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + getNumDataUnits(), getNumParityUnits()); + rsRawDecoder = CodecUtil.createRawDecoder(getConf(), + ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); } return rsRawDecoder; } private RawErasureEncoder checkCreateXorRawEncoder() { if (xorRawEncoder == null) { - xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(), - getNumDataUnits(), getNumParityUnits()); - xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false); + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + getNumDataUnits(), getNumParityUnits()); + xorRawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.XOR_CODEC_NAME, coderOptions); } return xorRawEncoder; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java index a402469c84..219f25cfed 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java @@ -22,7 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.CoderOption; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** @@ -64,17 +65,21 @@ public class HHXORErasureEncoder extends AbstractErasureEncoder { private RawErasureEncoder checkCreateRSRawEncoder() { if (rsRawEncoder == null) { - rsRawEncoder = CodecUtil.createRSRawEncoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + rsRawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); } return rsRawEncoder; } private RawErasureEncoder checkCreateXorRawEncoder() { if (xorRawEncoder == null) { - xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(), - getNumDataUnits(), getNumParityUnits()); - xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false); + ErasureCoderOptions erasureCoderOptions = new ErasureCoderOptions( + getNumDataUnits(), getNumParityUnits()); + xorRawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.XOR_CODEC_NAME, + erasureCoderOptions); } return xorRawEncoder; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java index 47efd297bf..afaaf24c14 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java @@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; /** @@ -55,8 +57,10 @@ public class RSErasureDecoder extends AbstractErasureDecoder { private RawErasureDecoder checkCreateRSRawDecoder() { if (rsRawDecoder == null) { // TODO: we should create the raw coder according to codec. - rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + rsRawDecoder = CodecUtil.createRawDecoder(getConf(), + ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); } return rsRawDecoder; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java index 4806d9e811..213911378b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java @@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** @@ -55,8 +57,10 @@ public class RSErasureEncoder extends AbstractErasureEncoder { private RawErasureEncoder checkCreateRSRawEncoder() { if (rawEncoder == null) { // TODO: we should create the raw coder according to codec. - rawEncoder = CodecUtil.createRSRawEncoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + rawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); } return rawEncoder; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java index a61bafd5c3..47fb8daab8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java @@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; /** @@ -43,8 +45,10 @@ public class XORErasureDecoder extends AbstractErasureDecoder { @Override protected ErasureCodingStep prepareDecodingStep( final ECBlockGroup blockGroup) { - RawErasureDecoder rawDecoder = CodecUtil.createXORRawDecoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(getConf(), + ErasureCodeConstants.XOR_CODEC_NAME, coderOptions); ECBlock[] inputBlocks = getInputBlocks(blockGroup); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java index 3f222477bf..1735179c94 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java @@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** @@ -43,8 +45,10 @@ public class XORErasureEncoder extends AbstractErasureEncoder { @Override protected ErasureCodingStep prepareEncodingStep( final ECBlockGroup blockGroup) { - RawErasureEncoder rawEncoder = CodecUtil.createXORRawEncoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + RawErasureEncoder rawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.XOR_CODEC_NAME, coderOptions); ECBlock[] inputBlocks = getInputBlocks(blockGroup); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java deleted file mode 100644 index b195216801..0000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.erasurecode.rawcoder; - -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configured; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - -/** - * A common class of basic facilities to be shared by encoder and decoder - * - * It implements the {@link RawErasureCoder} interface. - */ -@InterfaceAudience.Private -public abstract class AbstractRawErasureCoder - extends Configured implements RawErasureCoder { - - private static byte[] emptyChunk = new byte[4096]; - private final int numDataUnits; - private final int numParityUnits; - private final int numAllUnits; - private final Map coderOptions; - - public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) { - this.numDataUnits = numDataUnits; - this.numParityUnits = numParityUnits; - this.numAllUnits = numDataUnits + numParityUnits; - this.coderOptions = new HashMap<>(3); - - coderOptions.put(CoderOption.PREFER_DIRECT_BUFFER, preferDirectBuffer()); - coderOptions.put(CoderOption.ALLOW_CHANGE_INPUTS, false); - coderOptions.put(CoderOption.ALLOW_VERBOSE_DUMP, false); - } - - @Override - public Object getCoderOption(CoderOption option) { - if (option == null) { - throw new HadoopIllegalArgumentException("Invalid option"); - } - return coderOptions.get(option); - } - - @Override - public void setCoderOption(CoderOption option, Object value) { - if (option == null || value == null) { - throw new HadoopIllegalArgumentException( - "Invalid option or option value"); - } - if (option.isReadOnly()) { - throw new HadoopIllegalArgumentException( - "The option is read-only: " + option.name()); - } - - coderOptions.put(option, value); - } - - /** - * Make sure to return an empty chunk buffer for the desired length. - * @param leastLength - * @return empty chunk of zero bytes - */ - protected static byte[] getEmptyChunk(int leastLength) { - if (emptyChunk.length >= leastLength) { - return emptyChunk; // In most time - } - - synchronized (AbstractRawErasureCoder.class) { - emptyChunk = new byte[leastLength]; - } - - return emptyChunk; - } - - @Override - public int getNumDataUnits() { - return numDataUnits; - } - - @Override - public int getNumParityUnits() { - return numParityUnits; - } - - protected int getNumAllUnits() { - return numAllUnits; - } - - @Override - public void release() { - // Nothing to do by default - } - - /** - * Tell if direct buffer is preferred or not. It's for callers to - * decide how to allocate coding chunk buffers, using DirectByteBuffer or - * bytes array. It will return false by default. - * @return true if native buffer is preferred for performance consideration, - * otherwise false. - */ - protected boolean preferDirectBuffer() { - return false; - } - - protected boolean isAllowingChangeInputs() { - Object value = getCoderOption(CoderOption.ALLOW_CHANGE_INPUTS); - if (value != null && value instanceof Boolean) { - return (boolean) value; - } - return false; - } - - protected boolean isAllowingVerboseDump() { - Object value = getCoderOption(CoderOption.ALLOW_VERBOSE_DUMP); - if (value != null && value instanceof Boolean) { - return (boolean) value; - } - return false; - } - - /** - * Ensure a buffer filled with ZERO bytes from current readable/writable - * position. - * @param buffer a buffer ready to read / write certain size bytes - * @return the buffer itself, with ZERO bytes written, the position and limit - * are not changed after the call - */ - protected ByteBuffer resetBuffer(ByteBuffer buffer, int len) { - int pos = buffer.position(); - buffer.put(getEmptyChunk(len), 0, len); - buffer.position(pos); - - return buffer; - } - - /** - * Ensure the buffer (either input or output) ready to read or write with ZERO - * bytes fully in specified length of len. - * @param buffer bytes array buffer - * @return the buffer itself - */ - protected byte[] resetBuffer(byte[] buffer, int offset, int len) { - byte[] empty = getEmptyChunk(len); - System.arraycopy(empty, 0, buffer, offset, len); - - return buffer; - } - - /** - * Check and ensure the buffers are of the length specified by dataLen, also - * ensure the buffers are direct buffers or not according to isDirectBuffer. - * @param buffers the buffers to check - * @param allowNull whether to allow any element to be null or not - * @param dataLen the length of data available in the buffer to ensure with - * @param isDirectBuffer is direct buffer or not to ensure with - * @param isOutputs is output buffer or not - */ - protected void checkParameterBuffers(ByteBuffer[] buffers, boolean - allowNull, int dataLen, boolean isDirectBuffer, boolean isOutputs) { - for (ByteBuffer buffer : buffers) { - if (buffer == null && !allowNull) { - throw new HadoopIllegalArgumentException( - "Invalid buffer found, not allowing null"); - } else if (buffer != null) { - if (buffer.remaining() != dataLen) { - throw new HadoopIllegalArgumentException( - "Invalid buffer, not of length " + dataLen); - } - if (buffer.isDirect() != isDirectBuffer) { - throw new HadoopIllegalArgumentException( - "Invalid buffer, isDirect should be " + isDirectBuffer); - } - if (isOutputs) { - resetBuffer(buffer, dataLen); - } - } - } - } - - /** - * Check and ensure the buffers are of the length specified by dataLen. If is - * output buffers, ensure they will be ZEROed. - * @param buffers the buffers to check - * @param allowNull whether to allow any element to be null or not - * @param dataLen the length of data available in the buffer to ensure with - * @param isOutputs is output buffer or not - */ - protected void checkParameterBuffers(byte[][] buffers, boolean allowNull, - int dataLen, boolean isOutputs) { - for (byte[] buffer : buffers) { - if (buffer == null && !allowNull) { - throw new HadoopIllegalArgumentException( - "Invalid buffer found, not allowing null"); - } else if (buffer != null && buffer.length != dataLen) { - throw new HadoopIllegalArgumentException( - "Invalid buffer not of length " + dataLen); - } else if (isOutputs) { - resetBuffer(buffer, 0, dataLen); - } - } - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java deleted file mode 100644 index cf2b7389b5..0000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.erasurecode.rawcoder; - -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.io.erasurecode.ECChunk; -import org.apache.hadoop.io.erasurecode.rawcoder.util.CoderUtil; - -import java.nio.ByteBuffer; - -/** - * An abstract raw erasure decoder that's to be inherited by new decoders. - * - * It implements the {@link RawErasureDecoder} interface. - */ -@InterfaceAudience.Private -public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder - implements RawErasureDecoder { - - public AbstractRawErasureDecoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); - } - - @Override - public void decode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs) { - checkParameters(inputs, erasedIndexes, outputs); - - ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs); - boolean usingDirectBuffer = validInput.isDirect(); - int dataLen = validInput.remaining(); - if (dataLen == 0) { - return; - } - checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false); - checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true); - - int[] inputPositions = new int[inputs.length]; - for (int i = 0; i < inputPositions.length; i++) { - if (inputs[i] != null) { - inputPositions[i] = inputs[i].position(); - } - } - - if (usingDirectBuffer) { - doDecode(inputs, erasedIndexes, outputs); - } else { - int[] inputOffsets = new int[inputs.length]; - int[] outputOffsets = new int[outputs.length]; - byte[][] newInputs = new byte[inputs.length][]; - byte[][] newOutputs = new byte[outputs.length][]; - - ByteBuffer buffer; - for (int i = 0; i < inputs.length; ++i) { - buffer = inputs[i]; - if (buffer != null) { - inputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newInputs[i] = buffer.array(); - } - } - - for (int i = 0; i < outputs.length; ++i) { - buffer = outputs[i]; - outputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newOutputs[i] = buffer.array(); - } - - doDecode(newInputs, inputOffsets, dataLen, - erasedIndexes, newOutputs, outputOffsets); - } - - for (int i = 0; i < inputs.length; i++) { - if (inputs[i] != null) { - // dataLen bytes consumed - inputs[i].position(inputPositions[i] + dataLen); - } - } - } - - /** - * Perform the real decoding using Direct ByteBuffer. - * @param inputs Direct ByteBuffers expected - * @param erasedIndexes indexes of erased units in the inputs array - * @param outputs Direct ByteBuffers expected - */ - protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs); - - @Override - public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) { - checkParameters(inputs, erasedIndexes, outputs); - - byte[] validInput = CoderUtil.findFirstValidInput(inputs); - int dataLen = validInput.length; - if (dataLen == 0) { - return; - } - checkParameterBuffers(inputs, true, dataLen, false); - checkParameterBuffers(outputs, false, dataLen, true); - - int[] inputOffsets = new int[inputs.length]; // ALL ZERO - int[] outputOffsets = new int[outputs.length]; // ALL ZERO - - doDecode(inputs, inputOffsets, dataLen, erasedIndexes, outputs, - outputOffsets); - } - - /** - * Perform the real decoding using bytes array, supporting offsets and - * lengths. - * @param inputs the input byte arrays to read data from - * @param inputOffsets offsets for the input byte arrays to read data from - * @param dataLen how much data are to be read from - * @param erasedIndexes indexes of erased units in the inputs array - * @param outputs the output byte arrays to write resultant data into - * @param outputOffsets offsets from which to write resultant data into - */ - protected abstract void doDecode(byte[][] inputs, int[] inputOffsets, - int dataLen, int[] erasedIndexes, - byte[][] outputs, int[] outputOffsets); - - @Override - public void decode(ECChunk[] inputs, int[] erasedIndexes, - ECChunk[] outputs) { - ByteBuffer[] newInputs = ECChunk.toBuffers(inputs); - ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs); - decode(newInputs, erasedIndexes, newOutputs); - } - - /** - * Check and validate decoding parameters, throw exception accordingly. The - * checking assumes it's a MDS code. Other code can override this. - * @param inputs input buffers to check - * @param erasedIndexes indexes of erased units in the inputs array - * @param outputs output buffers to check - */ - protected void checkParameters(T[] inputs, int[] erasedIndexes, - T[] outputs) { - if (inputs.length != getNumParityUnits() + getNumDataUnits()) { - throw new IllegalArgumentException("Invalid inputs length"); - } - - if (erasedIndexes.length != outputs.length) { - throw new HadoopIllegalArgumentException( - "erasedIndexes and outputs mismatch in length"); - } - - if (erasedIndexes.length > getNumParityUnits()) { - throw new HadoopIllegalArgumentException( - "Too many erased, not recoverable"); - } - - int validInputs = 0; - for (T input : inputs) { - if (input != null) { - validInputs += 1; - } - } - - if (validInputs < getNumDataUnits()) { - throw new HadoopIllegalArgumentException( - "No enough valid inputs are provided, not recoverable"); - } - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java deleted file mode 100644 index 49cc2c4dfa..0000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.erasurecode.rawcoder; - -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.io.erasurecode.ECChunk; - -import java.nio.ByteBuffer; - -/** - * An abstract raw erasure encoder that's to be inherited by new encoders. - * - * It implements the {@link RawErasureEncoder} interface. - */ -@InterfaceAudience.Private -public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder - implements RawErasureEncoder { - - public AbstractRawErasureEncoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); - } - - @Override - public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) { - checkParameters(inputs, outputs); - - boolean usingDirectBuffer = inputs[0].isDirect(); - int dataLen = inputs[0].remaining(); - if (dataLen == 0) { - return; - } - checkParameterBuffers(inputs, false, dataLen, usingDirectBuffer, false); - checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true); - - int[] inputPositions = new int[inputs.length]; - for (int i = 0; i < inputPositions.length; i++) { - if (inputs[i] != null) { - inputPositions[i] = inputs[i].position(); - } - } - - if (usingDirectBuffer) { - doEncode(inputs, outputs); - } else { - int[] inputOffsets = new int[inputs.length]; - int[] outputOffsets = new int[outputs.length]; - byte[][] newInputs = new byte[inputs.length][]; - byte[][] newOutputs = new byte[outputs.length][]; - - ByteBuffer buffer; - for (int i = 0; i < inputs.length; ++i) { - buffer = inputs[i]; - inputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newInputs[i] = buffer.array(); - } - - for (int i = 0; i < outputs.length; ++i) { - buffer = outputs[i]; - outputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newOutputs[i] = buffer.array(); - } - - doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets); - } - - for (int i = 0; i < inputs.length; i++) { - if (inputs[i] != null) { - // dataLen bytes consumed - inputs[i].position(inputPositions[i] + dataLen); - } - } - } - - /** - * Perform the real encoding work using direct ByteBuffer - * @param inputs Direct ByteBuffers expected - * @param outputs Direct ByteBuffers expected - */ - protected abstract void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs); - - @Override - public void encode(byte[][] inputs, byte[][] outputs) { - checkParameters(inputs, outputs); - int dataLen = inputs[0].length; - if (dataLen == 0) { - return; - } - checkParameterBuffers(inputs, false, dataLen, false); - checkParameterBuffers(outputs, false, dataLen, true); - - int[] inputOffsets = new int[inputs.length]; // ALL ZERO - int[] outputOffsets = new int[outputs.length]; // ALL ZERO - - doEncode(inputs, inputOffsets, dataLen, outputs, outputOffsets); - } - - /** - * Perform the real encoding work using bytes array, supporting offsets - * and lengths. - * @param inputs the input byte arrays to read data from - * @param inputOffsets offsets for the input byte arrays to read data from - * @param dataLen how much data are to be read from - * @param outputs the output byte arrays to write resultant data into - * @param outputOffsets offsets from which to write resultant data into - */ - protected abstract void doEncode(byte[][] inputs, int[] inputOffsets, - int dataLen, byte[][] outputs, - int[] outputOffsets); - - @Override - public void encode(ECChunk[] inputs, ECChunk[] outputs) { - ByteBuffer[] newInputs = ECChunk.toBuffers(inputs); - ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs); - encode(newInputs, newOutputs); - } - - /** - * Check and validate decoding parameters, throw exception accordingly. - * @param inputs input buffers to check - * @param outputs output buffers to check - */ - protected void checkParameters(T[] inputs, T[] outputs) { - if (inputs.length != getNumDataUnits()) { - throw new HadoopIllegalArgumentException("Invalid inputs length"); - } - if (outputs.length != getNumParityUnits()) { - throw new HadoopIllegalArgumentException("Invalid outputs length"); - } - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java new file mode 100644 index 0000000000..69c084d455 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A utility class that maintains decoding state during a decode call using + * byte array inputs. + */ +@InterfaceAudience.Private +class ByteArrayDecodingState extends DecodingState { + byte[][] inputs; + int[] inputOffsets; + int[] erasedIndexes; + byte[][] outputs; + int[] outputOffsets; + + ByteArrayDecodingState(RawErasureDecoder decoder, byte[][] inputs, + int[] erasedIndexes, byte[][] outputs) { + this.decoder = decoder; + this.inputs = inputs; + this.outputs = outputs; + this.erasedIndexes = erasedIndexes; + byte[] validInput = CoderUtil.findFirstValidInput(inputs); + this.decodeLength = validInput.length; + + checkParameters(inputs, erasedIndexes, outputs); + checkInputBuffers(inputs); + checkOutputBuffers(outputs); + + this.inputOffsets = new int[inputs.length]; // ALL ZERO + this.outputOffsets = new int[outputs.length]; // ALL ZERO + } + + ByteArrayDecodingState(RawErasureDecoder decoder, + int decodeLength, + int[] erasedIndexes, + byte[][] inputs, + int[] inputOffsets, + byte[][] outputs, + int[] outputOffsets) { + this.decoder = decoder; + this.decodeLength = decodeLength; + this.erasedIndexes = erasedIndexes; + this.inputs = inputs; + this.outputs = outputs; + this.inputOffsets = inputOffsets; + this.outputOffsets = outputOffsets; + } + + /** + * Check and ensure the buffers are of the desired length. + * @param buffers the buffers to check + */ + void checkInputBuffers(byte[][] buffers) { + int validInputs = 0; + + for (byte[] buffer : buffers) { + if (buffer == null) { + continue; + } + + if (buffer.length != decodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + decodeLength); + } + + validInputs++; + } + + if (validInputs < decoder.getNumDataUnits()) { + throw new HadoopIllegalArgumentException( + "No enough valid inputs are provided, not recoverable"); + } + } + + /** + * Check and ensure the buffers are of the desired length. + * @param buffers the buffers to check + */ + void checkOutputBuffers(byte[][] buffers) { + for (byte[] buffer : buffers) { + if (buffer == null) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } + + if (buffer.length != decodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer not of length " + decodeLength); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java new file mode 100644 index 0000000000..9d861d4e47 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A utility class that maintains encoding state during an encode call using + * byte array inputs. + */ +@InterfaceAudience.Private +class ByteArrayEncodingState extends EncodingState { + byte[][] inputs; + byte[][] outputs; + int[] inputOffsets; + int[] outputOffsets; + + ByteArrayEncodingState(RawErasureEncoder encoder, + byte[][] inputs, byte[][] outputs) { + this.encoder = encoder; + byte[] validInput = CoderUtil.findFirstValidInput(inputs); + this.encodeLength = validInput.length; + this.inputs = inputs; + this.outputs = outputs; + + checkParameters(inputs, outputs); + checkBuffers(inputs); + checkBuffers(outputs); + + this.inputOffsets = new int[inputs.length]; // ALL ZERO + this.outputOffsets = new int[outputs.length]; // ALL ZERO + } + + ByteArrayEncodingState(RawErasureEncoder encoder, + int encodeLength, + byte[][] inputs, + int[] inputOffsets, + byte[][] outputs, + int[] outputOffsets) { + this.encoder = encoder; + this.encodeLength = encodeLength; + this.inputs = inputs; + this.outputs = outputs; + this.inputOffsets = inputOffsets; + this.outputOffsets = outputOffsets; + } + + /** + * Check and ensure the buffers are of the desired length. + * @param buffers the buffers to check + */ + void checkBuffers(byte[][] buffers) { + for (byte[] buffer : buffers) { + if (buffer == null) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } + + if (buffer.length != encodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer not of length " + encodeLength); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java new file mode 100644 index 0000000000..5c5b0f6c32 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.nio.ByteBuffer; + +/** + * A utility class that maintains decoding state during a decode call using + * ByteBuffer inputs. + */ +@InterfaceAudience.Private +class ByteBufferDecodingState extends DecodingState { + ByteBuffer[] inputs; + ByteBuffer[] outputs; + int[] erasedIndexes; + boolean usingDirectBuffer; + + ByteBufferDecodingState(RawErasureDecoder decoder, ByteBuffer[] inputs, + int[] erasedIndexes, ByteBuffer[] outputs) { + this.decoder = decoder; + this.inputs = inputs; + this.outputs = outputs; + this.erasedIndexes = erasedIndexes; + ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs); + this.decodeLength = validInput.remaining(); + this.usingDirectBuffer = validInput.isDirect(); + + checkParameters(inputs, erasedIndexes, outputs); + checkInputBuffers(inputs); + checkOutputBuffers(outputs); + } + + /** + * Convert to a ByteArrayEncodingState when it's backed by on-heap arrays. + */ + ByteArrayDecodingState convertToByteArrayState() { + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + if (buffer != null) { + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newInputs[i] = buffer.array(); + } + } + + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newOutputs[i] = buffer.array(); + } + + ByteArrayDecodingState baeState = new ByteArrayDecodingState(decoder, + decodeLength, erasedIndexes, newInputs, + inputOffsets, newOutputs, outputOffsets); + return baeState; + } + + /** + * Check and ensure the buffers are of the desired length and type, direct + * buffers or not. + * @param buffers the buffers to check + */ + void checkInputBuffers(ByteBuffer[] buffers) { + int validInputs = 0; + + for (ByteBuffer buffer : buffers) { + if (buffer == null) { + continue; + } + + if (buffer.remaining() != decodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + decodeLength); + } + if (buffer.isDirect() != usingDirectBuffer) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, isDirect should be " + usingDirectBuffer); + } + + validInputs++; + } + + if (validInputs < decoder.getNumDataUnits()) { + throw new HadoopIllegalArgumentException( + "No enough valid inputs are provided, not recoverable"); + } + } + + /** + * Check and ensure the buffers are of the desired length and type, direct + * buffers or not. + * @param buffers the buffers to check + */ + void checkOutputBuffers(ByteBuffer[] buffers) { + for (ByteBuffer buffer : buffers) { + if (buffer == null) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } + + if (buffer.remaining() != decodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + decodeLength); + } + if (buffer.isDirect() != usingDirectBuffer) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, isDirect should be " + usingDirectBuffer); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java new file mode 100644 index 0000000000..7a10ac21bd --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.nio.ByteBuffer; + +/** + * A utility class that maintains encoding state during an encode call using + * ByteBuffer inputs. + */ +@InterfaceAudience.Private +class ByteBufferEncodingState extends EncodingState { + ByteBuffer[] inputs; + ByteBuffer[] outputs; + boolean usingDirectBuffer; + + ByteBufferEncodingState(RawErasureEncoder encoder, + ByteBuffer[] inputs, ByteBuffer[] outputs) { + this.encoder = encoder; + ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs); + this.encodeLength = validInput.remaining(); + this.usingDirectBuffer = validInput.isDirect(); + this.inputs = inputs; + this.outputs = outputs; + + checkParameters(inputs, outputs); + checkBuffers(inputs); + checkBuffers(outputs); + } + + /** + * Convert to a ByteArrayEncodingState when it's backed by on-heap arrays. + */ + ByteArrayEncodingState convertToByteArrayState() { + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newInputs[i] = buffer.array(); + } + + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newOutputs[i] = buffer.array(); + } + + ByteArrayEncodingState baeState = new ByteArrayEncodingState(encoder, + encodeLength, newInputs, inputOffsets, newOutputs, outputOffsets); + return baeState; + } + + /** + * Check and ensure the buffers are of the desired length and type, direct + * buffers or not. + * @param buffers the buffers to check + */ + void checkBuffers(ByteBuffer[] buffers) { + for (ByteBuffer buffer : buffers) { + if (buffer == null) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } + + if (buffer.remaining() != encodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + encodeLength); + } + if (buffer.isDirect() != usingDirectBuffer) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, isDirect should be " + usingDirectBuffer); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java new file mode 100644 index 0000000000..aceb3c6511 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ECChunk; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Helpful utilities for implementing some raw erasure coders. + */ +@InterfaceAudience.Private +final class CoderUtil { + + private CoderUtil() { + // No called + } + + private static byte[] emptyChunk = new byte[4096]; + + /** + * Make sure to return an empty chunk buffer for the desired length. + * @param leastLength + * @return empty chunk of zero bytes + */ + static byte[] getEmptyChunk(int leastLength) { + if (emptyChunk.length >= leastLength) { + return emptyChunk; // In most time + } + + synchronized (CoderUtil.class) { + emptyChunk = new byte[leastLength]; + } + + return emptyChunk; + } + + /** + * Ensure a buffer filled with ZERO bytes from current readable/writable + * position. + * @param buffer a buffer ready to read / write certain size bytes + * @return the buffer itself, with ZERO bytes written, the position and limit + * are not changed after the call + */ + static ByteBuffer resetBuffer(ByteBuffer buffer, int len) { + int pos = buffer.position(); + buffer.put(getEmptyChunk(len), 0, len); + buffer.position(pos); + + return buffer; + } + + /** + * Ensure the buffer (either input or output) ready to read or write with ZERO + * bytes fully in specified length of len. + * @param buffer bytes array buffer + * @return the buffer itself + */ + static byte[] resetBuffer(byte[] buffer, int offset, int len) { + byte[] empty = getEmptyChunk(len); + System.arraycopy(empty, 0, buffer, offset, len); + + return buffer; + } + + /** + * Initialize the output buffers with ZERO bytes. + * @param buffers + * @param dataLen + */ + static void resetOutputBuffers(ByteBuffer[] buffers, int dataLen) { + for (ByteBuffer buffer : buffers) { + resetBuffer(buffer, dataLen); + } + } + + /** + * Initialize the output buffers with ZERO bytes. + * @param buffers + * @param dataLen + */ + static void resetOutputBuffers(byte[][] buffers, int[] offsets, + int dataLen) { + for (int i = 0; i < buffers.length; i++) { + resetBuffer(buffers[i], offsets[i], dataLen); + } + } + + /** + * Convert an array of this chunks to an array of ByteBuffers + * @param chunks chunks to convertToByteArrayState into buffers + * @return an array of ByteBuffers + */ + static ByteBuffer[] toBuffers(ECChunk[] chunks) { + ByteBuffer[] buffers = new ByteBuffer[chunks.length]; + + ECChunk chunk; + for (int i = 0; i < chunks.length; i++) { + chunk = chunks[i]; + if (chunk == null) { + buffers[i] = null; + } else { + buffers[i] = chunk.getBuffer(); + } + } + + return buffers; + } + + /** + * Clone an input bytes array as direct ByteBuffer. + * @param input + * @param len + * @param offset + * @return direct ByteBuffer + */ + static ByteBuffer cloneAsDirectByteBuffer(byte[] input, int offset, int len) { + if (input == null) { // an input can be null, if erased or not to read + return null; + } + + ByteBuffer directBuffer = ByteBuffer.allocateDirect(len); + directBuffer.put(input, offset, len); + directBuffer.flip(); + return directBuffer; + } + + /** + * Get indexes array for items marked as null, either erased or + * not to read. + * @return indexes array + */ + static int[] getNullIndexes(T[] inputs) { + int[] nullIndexes = new int[inputs.length]; + int idx = 0; + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] == null) { + nullIndexes[idx++] = i; + } + } + + return Arrays.copyOf(nullIndexes, idx); + } + + /** + * Find the valid input from all the inputs. + * @param inputs input buffers to look for valid input + * @return the first valid input + */ + static T findFirstValidInput(T[] inputs) { + if (inputs.length > 0 && inputs[0] != null) { + return inputs[0]; + } + + for (T input : inputs) { + if (input != null) { + return input; + } + } + + throw new HadoopIllegalArgumentException( + "Invalid inputs are found, all being null"); + } + + /** + * Picking up indexes of valid inputs. + * @param inputs decoding input buffers + * @param + */ + static int[] getValidIndexes(T[] inputs) { + int[] validIndexes = new int[inputs.length]; + int idx = 0; + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] != null) { + validIndexes[idx++] = i; + } + } + + return Arrays.copyOf(validIndexes, idx); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java new file mode 100644 index 0000000000..4b693a4a3f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A utility class that maintains decoding state during a decode call. + */ +@InterfaceAudience.Private +class DecodingState { + RawErasureDecoder decoder; + int decodeLength; + + /** + * Check and validate decoding parameters, throw exception accordingly. The + * checking assumes it's a MDS code. Other code can override this. + * @param inputs input buffers to check + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs output buffers to check + */ + void checkParameters(T[] inputs, int[] erasedIndexes, + T[] outputs) { + if (inputs.length != decoder.getNumParityUnits() + + decoder.getNumDataUnits()) { + throw new IllegalArgumentException("Invalid inputs length"); + } + + if (erasedIndexes.length != outputs.length) { + throw new HadoopIllegalArgumentException( + "erasedIndexes and outputs mismatch in length"); + } + + if (erasedIndexes.length > decoder.getNumParityUnits()) { + throw new HadoopIllegalArgumentException( + "Too many erased, not recoverable"); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java index 25dfa573cd..256a725795 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java @@ -18,8 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; - -import java.nio.ByteBuffer; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * A dummy raw decoder that does no real computation. @@ -28,20 +27,19 @@ import java.nio.ByteBuffer; * instead of codec, and is intended for test only. */ @InterfaceAudience.Private -public class DummyRawDecoder extends AbstractRawErasureDecoder { - public DummyRawDecoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); +public class DummyRawDecoder extends RawErasureDecoder { + + public DummyRawDecoder(ErasureCoderOptions coderOptions) { + super(coderOptions); } @Override - protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs) { + protected void doDecode(ByteBufferDecodingState decodingState) { // Nothing to do. Output buffers have already been reset } @Override - protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen, - int[] erasedIndexes, byte[][] outputs, int[] outputOffsets) { + protected void doDecode(ByteArrayDecodingState decodingState) { // Nothing to do. Output buffers have already been reset } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java index 33e026db10..558e35014c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java @@ -18,8 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; - -import java.nio.ByteBuffer; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * A dummy raw encoder that does no real computation. @@ -28,19 +27,19 @@ import java.nio.ByteBuffer; * instead of codec, and is intended for test only. */ @InterfaceAudience.Private -public class DummyRawEncoder extends AbstractRawErasureEncoder { - public DummyRawEncoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); +public class DummyRawEncoder extends RawErasureEncoder { + + public DummyRawEncoder(ErasureCoderOptions coderOptions) { + super(coderOptions); } @Override - protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { + protected void doEncode(ByteArrayEncodingState encodingState) { // Nothing to do. Output buffers have already been reset } @Override - protected void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen, - byte[][] outputs, int[] outputOffsets) { + protected void doEncode(ByteBufferEncodingState encodingState) { // Nothing to do. Output buffers have already been reset } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java index 73457c2ea0..31ba4efc04 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java @@ -18,19 +18,21 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * A raw erasure coder factory for dummy raw coders. */ @InterfaceAudience.Private public class DummyRawErasureCoderFactory implements RawErasureCoderFactory { + @Override - public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) { - return new DummyRawEncoder(numDataUnits, numParityUnits); + public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { + return new DummyRawEncoder(coderOptions); } @Override - public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) { - return new DummyRawDecoder(numDataUnits, numParityUnits); + public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { + return new DummyRawDecoder(coderOptions); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java new file mode 100644 index 0000000000..a8946d25c3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A utility class that maintains encoding state during an encode call. + */ +@InterfaceAudience.Private +abstract class EncodingState { + RawErasureEncoder encoder; + int encodeLength; + + /** + * Check and validate decoding parameters, throw exception accordingly. + * @param inputs input buffers to check + * @param outputs output buffers to check + */ + void checkParameters(T[] inputs, T[] outputs) { + if (inputs.length != encoder.getNumDataUnits()) { + throw new HadoopIllegalArgumentException("Invalid inputs length"); + } + if (outputs.length != encoder.getNumParityUnits()) { + throw new HadoopIllegalArgumentException("Invalid outputs length"); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java index 5b9e0e9ece..d7f78abc05 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java @@ -19,7 +19,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.io.erasurecode.rawcoder.util.CoderUtil; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.util.DumpUtil; import org.apache.hadoop.io.erasurecode.rawcoder.util.GF256; import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil; @@ -34,7 +34,7 @@ import java.util.Arrays; * from HDFS-RAID, and also compatible with the native/ISA-L coder. */ @InterfaceAudience.Private -public class RSRawDecoder extends AbstractRawErasureDecoder { +public class RSRawDecoder extends RawErasureDecoder { //relevant to schema and won't change during decode calls private byte[] encodeMatrix; @@ -54,52 +54,54 @@ public class RSRawDecoder extends AbstractRawErasureDecoder { private int numErasedDataUnits; private boolean[] erasureFlags; - public RSRawDecoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); - if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) { + public RSRawDecoder(ErasureCoderOptions coderOptions) { + super(coderOptions); + + int numAllUnits = getNumAllUnits(); + if (getNumAllUnits() >= RSUtil.GF.getFieldSize()) { throw new HadoopIllegalArgumentException( "Invalid getNumDataUnits() and numParityUnits"); } - int numAllUnits = getNumDataUnits() + numParityUnits; encodeMatrix = new byte[numAllUnits * getNumDataUnits()]; RSUtil.genCauchyMatrix(encodeMatrix, numAllUnits, getNumDataUnits()); - if (isAllowingVerboseDump()) { - DumpUtil.dumpMatrix(encodeMatrix, numDataUnits, numAllUnits); + if (allowVerboseDump()) { + DumpUtil.dumpMatrix(encodeMatrix, getNumDataUnits(), numAllUnits); } } @Override - protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs) { - prepareDecoding(inputs, erasedIndexes); + protected void doDecode(ByteBufferDecodingState decodingState) { + CoderUtil.resetOutputBuffers(decodingState.outputs, + decodingState.decodeLength); + prepareDecoding(decodingState.inputs, decodingState.erasedIndexes); ByteBuffer[] realInputs = new ByteBuffer[getNumDataUnits()]; for (int i = 0; i < getNumDataUnits(); i++) { - realInputs[i] = inputs[validIndexes[i]]; + realInputs[i] = decodingState.inputs[validIndexes[i]]; } - RSUtil.encodeData(gfTables, realInputs, outputs); + RSUtil.encodeData(gfTables, realInputs, decodingState.outputs); } @Override - protected void doDecode(byte[][] inputs, int[] inputOffsets, - int dataLen, int[] erasedIndexes, - byte[][] outputs, int[] outputOffsets) { - prepareDecoding(inputs, erasedIndexes); + protected void doDecode(ByteArrayDecodingState decodingState) { + int dataLen = decodingState.decodeLength; + CoderUtil.resetOutputBuffers(decodingState.outputs, + decodingState.outputOffsets, dataLen); + prepareDecoding(decodingState.inputs, decodingState.erasedIndexes); byte[][] realInputs = new byte[getNumDataUnits()][]; int[] realInputOffsets = new int[getNumDataUnits()]; for (int i = 0; i < getNumDataUnits(); i++) { - realInputs[i] = inputs[validIndexes[i]]; - realInputOffsets[i] = inputOffsets[validIndexes[i]]; + realInputs[i] = decodingState.inputs[validIndexes[i]]; + realInputOffsets[i] = decodingState.inputOffsets[validIndexes[i]]; } RSUtil.encodeData(gfTables, dataLen, realInputs, realInputOffsets, - outputs, outputOffsets); + decodingState.outputs, decodingState.outputOffsets); } private void prepareDecoding(T[] inputs, int[] erasedIndexes) { - int[] tmpValidIndexes = new int[getNumDataUnits()]; - CoderUtil.makeValidIndexes(inputs, tmpValidIndexes); + int[] tmpValidIndexes = CoderUtil.getValidIndexes(inputs); if (Arrays.equals(this.cachedErasedIndexes, erasedIndexes) && Arrays.equals(this.validIndexes, tmpValidIndexes)) { return; // Optimization. Nothing to do @@ -132,7 +134,7 @@ public class RSRawDecoder extends AbstractRawErasureDecoder { RSUtil.initTables(getNumDataUnits(), erasedIndexes.length, decodeMatrix, 0, gfTables); - if (isAllowingVerboseDump()) { + if (allowVerboseDump()) { System.out.println(DumpUtil.bytesToHex(gfTables, -1)); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoderLegacy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoderLegacy.java index 7c8fa59dcf..01837604d9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoderLegacy.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoderLegacy.java @@ -19,7 +19,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.io.erasurecode.rawcoder.util.CoderUtil; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil; import java.nio.ByteBuffer; @@ -34,7 +34,7 @@ import java.nio.ByteBuffer; * addressed in HADOOP-11871. */ @InterfaceAudience.Private -public class RSRawDecoderLegacy extends AbstractRawErasureDecoder { +public class RSRawDecoderLegacy extends RawErasureDecoder { // To describe and calculate the needed Vandermonde matrix private int[] errSignature; private int[] primitivePower; @@ -61,16 +61,16 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder { private ByteBuffer[] adjustedDirectBufferOutputsParameter = new ByteBuffer[getNumParityUnits()]; - public RSRawDecoderLegacy(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); - if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) { + public RSRawDecoderLegacy(ErasureCoderOptions coderOptions) { + super(coderOptions); + if (getNumAllUnits() >= RSUtil.GF.getFieldSize()) { throw new HadoopIllegalArgumentException( "Invalid numDataUnits and numParityUnits"); } - this.errSignature = new int[numParityUnits]; - this.primitivePower = RSUtil.getPrimitivePower(numDataUnits, - numParityUnits); + this.errSignature = new int[getNumParityUnits()]; + this.primitivePower = RSUtil.getPrimitivePower(getNumDataUnits(), + getNumParityUnits()); } @Override @@ -129,16 +129,18 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder { } @Override - protected void doDecode(byte[][] inputs, int[] inputOffsets, - int dataLen, int[] erasedIndexes, - byte[][] outputs, int[] outputOffsets) { + protected void doDecode(ByteArrayDecodingState decodingState) { + int dataLen = decodingState.decodeLength; + CoderUtil.resetOutputBuffers(decodingState.outputs, + decodingState.outputOffsets, dataLen); + /** * As passed parameters are friendly to callers but not to the underlying * implementations, so we have to adjust them before calling doDecodeImpl. */ int[] erasedOrNotToReadIndexes = - CoderUtil.getErasedOrNotToReadIndexes(inputs); + CoderUtil.getNullIndexes(decodingState.inputs); // Prepare for adjustedOutputsParameter @@ -148,16 +150,18 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder { adjustedOutputOffsets[i] = 0; } // Use the caller passed buffers in erasedIndexes positions - for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) { + for (int outputIdx = 0, i = 0; + i < decodingState.erasedIndexes.length; i++) { boolean found = false; for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) { // If this index is one requested by the caller via erasedIndexes, then // we use the passed output buffer to avoid copying data thereafter. - if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { + if (decodingState.erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { found = true; - adjustedByteArrayOutputsParameter[j] = resetBuffer( - outputs[outputIdx], outputOffsets[outputIdx], dataLen); - adjustedOutputOffsets[j] = outputOffsets[outputIdx]; + adjustedByteArrayOutputsParameter[j] = CoderUtil.resetBuffer( + decodingState.outputs[outputIdx], + decodingState.outputOffsets[outputIdx], dataLen); + adjustedOutputOffsets[j] = decodingState.outputOffsets[outputIdx]; outputIdx++; } } @@ -169,22 +173,22 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder { // Use shared buffers for other positions (not set yet) for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) { if (adjustedByteArrayOutputsParameter[i] == null) { - adjustedByteArrayOutputsParameter[i] = resetBuffer( + adjustedByteArrayOutputsParameter[i] = CoderUtil.resetBuffer( checkGetBytesArrayBuffer(bufferIdx, dataLen), 0, dataLen); adjustedOutputOffsets[i] = 0; // Always 0 for such temp output bufferIdx++; } } - doDecodeImpl(inputs, inputOffsets, dataLen, erasedOrNotToReadIndexes, + doDecodeImpl(decodingState.inputs, decodingState.inputOffsets, + dataLen, erasedOrNotToReadIndexes, adjustedByteArrayOutputsParameter, adjustedOutputOffsets); } @Override - protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs) { - ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs); - int dataLen = validInput.remaining(); + protected void doDecode(ByteBufferDecodingState decodingState) { + int dataLen = decodingState.decodeLength; + CoderUtil.resetOutputBuffers(decodingState.outputs, dataLen); /** * As passed parameters are friendly to callers but not to the underlying @@ -192,7 +196,7 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder { */ int[] erasedOrNotToReadIndexes = - CoderUtil.getErasedOrNotToReadIndexes(inputs); + CoderUtil.getNullIndexes(decodingState.inputs); // Prepare for adjustedDirectBufferOutputsParameter @@ -201,15 +205,16 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder { adjustedDirectBufferOutputsParameter[i] = null; } // Use the caller passed buffers in erasedIndexes positions - for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) { + for (int outputIdx = 0, i = 0; + i < decodingState.erasedIndexes.length; i++) { boolean found = false; for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) { // If this index is one requested by the caller via erasedIndexes, then // we use the passed output buffer to avoid copying data thereafter. - if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { + if (decodingState.erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { found = true; - adjustedDirectBufferOutputsParameter[j] = - resetBuffer(outputs[outputIdx++], dataLen); + adjustedDirectBufferOutputsParameter[j] = CoderUtil.resetBuffer( + decodingState.outputs[outputIdx++], dataLen); } } if (!found) { @@ -223,12 +228,13 @@ public class RSRawDecoderLegacy extends AbstractRawErasureDecoder { ByteBuffer buffer = checkGetDirectBuffer(bufferIdx, dataLen); buffer.position(0); buffer.limit(dataLen); - adjustedDirectBufferOutputsParameter[i] = resetBuffer(buffer, dataLen); + adjustedDirectBufferOutputsParameter[i] = + CoderUtil.resetBuffer(buffer, dataLen); bufferIdx++; } } - doDecodeImpl(inputs, erasedOrNotToReadIndexes, + doDecodeImpl(decodingState.inputs, erasedOrNotToReadIndexes, adjustedDirectBufferOutputsParameter); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java index cee65746c4..fd82363026 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java @@ -19,11 +19,10 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.util.DumpUtil; import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil; -import java.nio.ByteBuffer; - /** * A raw erasure encoder in RS code scheme in pure Java in case native one * isn't available in some environment. Please always use native implementations @@ -31,7 +30,7 @@ import java.nio.ByteBuffer; * from HDFS-RAID, and also compatible with the native/ISA-L coder. */ @InterfaceAudience.Private -public class RSRawEncoder extends AbstractRawErasureEncoder { +public class RSRawEncoder extends RawErasureEncoder { // relevant to schema and won't change during encode calls. private byte[] encodeMatrix; /** @@ -40,36 +39,42 @@ public class RSRawEncoder extends AbstractRawErasureEncoder { */ private byte[] gfTables; - public RSRawEncoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); + public RSRawEncoder(ErasureCoderOptions coderOptions) { + super(coderOptions); - if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) { + if (getNumAllUnits() >= RSUtil.GF.getFieldSize()) { throw new HadoopIllegalArgumentException( "Invalid numDataUnits and numParityUnits"); } - encodeMatrix = new byte[getNumAllUnits() * numDataUnits]; - RSUtil.genCauchyMatrix(encodeMatrix, getNumAllUnits(), numDataUnits); - if (isAllowingVerboseDump()) { - DumpUtil.dumpMatrix(encodeMatrix, numDataUnits, getNumAllUnits()); + encodeMatrix = new byte[getNumAllUnits() * getNumDataUnits()]; + RSUtil.genCauchyMatrix(encodeMatrix, getNumAllUnits(), getNumDataUnits()); + if (allowVerboseDump()) { + DumpUtil.dumpMatrix(encodeMatrix, getNumDataUnits(), getNumAllUnits()); } - gfTables = new byte[getNumAllUnits() * numDataUnits * 32]; - RSUtil.initTables(numDataUnits, numParityUnits, encodeMatrix, - numDataUnits * numDataUnits, gfTables); - if (isAllowingVerboseDump()) { + gfTables = new byte[getNumAllUnits() * getNumDataUnits() * 32]; + RSUtil.initTables(getNumDataUnits(), getNumParityUnits(), encodeMatrix, + getNumDataUnits() * getNumDataUnits(), gfTables); + if (allowVerboseDump()) { System.out.println(DumpUtil.bytesToHex(gfTables, -1)); } } @Override - protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { - RSUtil.encodeData(gfTables, inputs, outputs); + protected void doEncode(ByteBufferEncodingState encodingState) { + CoderUtil.resetOutputBuffers(encodingState.outputs, + encodingState.encodeLength); + RSUtil.encodeData(gfTables, encodingState.inputs, encodingState.outputs); } @Override - protected void doEncode(byte[][] inputs, int[] inputOffsets, - int dataLen, byte[][] outputs, int[] outputOffsets) { - RSUtil.encodeData(gfTables, dataLen, inputs, inputOffsets, outputs, - outputOffsets); + protected void doEncode(ByteArrayEncodingState encodingState) { + CoderUtil.resetOutputBuffers(encodingState.outputs, + encodingState.outputOffsets, + encodingState.encodeLength); + RSUtil.encodeData(gfTables, encodingState.encodeLength, + encodingState.inputs, + encodingState.inputOffsets, encodingState.outputs, + encodingState.outputOffsets); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoderLegacy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoderLegacy.java index 805772087e..ed1c83b428 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoderLegacy.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoderLegacy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil; import java.nio.ByteBuffer; @@ -29,20 +30,20 @@ import java.util.Arrays; * when possible. */ @InterfaceAudience.Private -public class RSRawEncoderLegacy extends AbstractRawErasureEncoder { +public class RSRawEncoderLegacy extends RawErasureEncoder { private int[] generatingPolynomial; - public RSRawEncoderLegacy(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); + public RSRawEncoderLegacy(ErasureCoderOptions coderOptions) { + super(coderOptions); assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize()); - int[] primitivePower = RSUtil.getPrimitivePower(numDataUnits, - numParityUnits); + int[] primitivePower = RSUtil.getPrimitivePower(getNumDataUnits(), + getNumParityUnits()); // compute generating polynomial int[] gen = {1}; int[] poly = new int[2]; - for (int i = 0; i < numParityUnits; i++) { + for (int i = 0; i < getNumParityUnits(); i++) { poly[0] = primitivePower[i]; poly[1] = 1; gen = RSUtil.GF.multiply(gen, poly); @@ -52,15 +53,21 @@ public class RSRawEncoderLegacy extends AbstractRawErasureEncoder { } @Override - protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { + protected void doEncode(ByteBufferEncodingState encodingState) { + CoderUtil.resetOutputBuffers(encodingState.outputs, + encodingState.encodeLength); // parity units + data units - ByteBuffer[] all = new ByteBuffer[outputs.length + inputs.length]; + ByteBuffer[] all = new ByteBuffer[encodingState.outputs.length + + encodingState.inputs.length]; - if (isAllowingChangeInputs()) { - System.arraycopy(outputs, 0, all, 0, outputs.length); - System.arraycopy(inputs, 0, all, outputs.length, inputs.length); + if (allowChangeInputs()) { + System.arraycopy(encodingState.outputs, 0, all, 0, + encodingState.outputs.length); + System.arraycopy(encodingState.inputs, 0, all, + encodingState.outputs.length, encodingState.inputs.length); } else { - System.arraycopy(outputs, 0, all, 0, outputs.length); + System.arraycopy(encodingState.outputs, 0, all, 0, + encodingState.outputs.length); /** * Note when this coder would be really (rarely) used in a production @@ -68,11 +75,11 @@ public class RSRawEncoderLegacy extends AbstractRawErasureEncoder { * buffers avoiding reallocating. */ ByteBuffer tmp; - for (int i = 0; i < inputs.length; i++) { - tmp = ByteBuffer.allocate(inputs[i].remaining()); - tmp.put(inputs[i]); + for (int i = 0; i < encodingState.inputs.length; i++) { + tmp = ByteBuffer.allocate(encodingState.inputs[i].remaining()); + tmp.put(encodingState.inputs[i]); tmp.flip(); - all[outputs.length + i] = tmp; + all[encodingState.outputs.length + i] = tmp; } } @@ -81,27 +88,38 @@ public class RSRawEncoderLegacy extends AbstractRawErasureEncoder { } @Override - protected void doEncode(byte[][] inputs, int[] inputOffsets, - int dataLen, byte[][] outputs, - int[] outputOffsets) { + protected void doEncode(ByteArrayEncodingState encodingState) { + int dataLen = encodingState.encodeLength; + CoderUtil.resetOutputBuffers(encodingState.outputs, + encodingState.outputOffsets, dataLen); // parity units + data units - byte[][] all = new byte[outputs.length + inputs.length][]; - int[] allOffsets = new int[outputOffsets.length + inputOffsets.length]; + byte[][] all = new byte[encodingState.outputs.length + + encodingState.inputs.length][]; + int[] allOffsets = new int[encodingState.outputOffsets.length + + encodingState.inputOffsets.length]; - if (isAllowingChangeInputs()) { - System.arraycopy(outputs, 0, all, 0, outputs.length); - System.arraycopy(inputs, 0, all, outputs.length, inputs.length); + if (allowChangeInputs()) { + System.arraycopy(encodingState.outputs, 0, all, 0, + encodingState.outputs.length); + System.arraycopy(encodingState.inputs, 0, all, + encodingState.outputs.length, encodingState.inputs.length); - System.arraycopy(outputOffsets, 0, allOffsets, 0, outputOffsets.length); - System.arraycopy(inputOffsets, 0, allOffsets, - outputOffsets.length, inputOffsets.length); + System.arraycopy(encodingState.outputOffsets, 0, allOffsets, 0, + encodingState.outputOffsets.length); + System.arraycopy(encodingState.inputOffsets, 0, allOffsets, + encodingState.outputOffsets.length, + encodingState.inputOffsets.length); } else { - System.arraycopy(outputs, 0, all, 0, outputs.length); - System.arraycopy(outputOffsets, 0, allOffsets, 0, outputOffsets.length); + System.arraycopy(encodingState.outputs, 0, all, 0, + encodingState.outputs.length); + System.arraycopy(encodingState.outputOffsets, 0, allOffsets, 0, + encodingState.outputOffsets.length); - for (int i = 0; i < inputs.length; i++) { - all[outputs.length + i] = Arrays.copyOfRange(inputs[i], - inputOffsets[i], inputOffsets[i] + dataLen); + for (int i = 0; i < encodingState.inputs.length; i++) { + all[encodingState.outputs.length + i] = + Arrays.copyOfRange(encodingState.inputs[i], + encodingState.inputOffsets[i], + encodingState.inputOffsets[i] + dataLen); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactory.java index b38db4b9a8..8d954d59ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * A raw coder factory for the new raw Reed-Solomon coder in Java. @@ -26,12 +27,12 @@ import org.apache.hadoop.classification.InterfaceAudience; public class RSRawErasureCoderFactory implements RawErasureCoderFactory { @Override - public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) { - return new RSRawEncoder(numDataUnits, numParityUnits); + public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { + return new RSRawEncoder(coderOptions); } @Override - public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) { - return new RSRawDecoder(numDataUnits, numParityUnits); + public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { + return new RSRawDecoder(coderOptions); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactoryLegacy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactoryLegacy.java index 5aa75e4e16..f0ebb3b3b7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactoryLegacy.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactoryLegacy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * A raw coder factory for the legacy raw Reed-Solomon coder in Java. @@ -26,12 +27,12 @@ import org.apache.hadoop.classification.InterfaceAudience; public class RSRawErasureCoderFactoryLegacy implements RawErasureCoderFactory { @Override - public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) { - return new RSRawEncoderLegacy(numDataUnits, numParityUnits); + public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { + return new RSRawEncoderLegacy(coderOptions); } @Override - public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) { - return new RSRawDecoderLegacy(numDataUnits, numParityUnits); + public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { + return new RSRawDecoderLegacy(coderOptions); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoder.java deleted file mode 100644 index 20a1a690e1..0000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoder.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.erasurecode.rawcoder; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; - -/** - * RawErasureCoder is a common interface for {@link RawErasureEncoder} and - * {@link RawErasureDecoder} as both encoder and decoder share some properties. - * - * RawErasureCoder is part of ErasureCodec framework, where ErasureCoder is - * used to encode/decode a group of blocks (BlockGroup) according to the codec - * specific BlockGroup layout and logic. An ErasureCoder extracts chunks of - * data from the blocks and can employ various low level RawErasureCoders to - * perform encoding/decoding against the chunks. - * - * To distinguish from ErasureCoder, here RawErasureCoder is used to mean the - * low level constructs, since it only takes care of the math calculation with - * a group of byte buffers. - */ -@InterfaceAudience.Private -public interface RawErasureCoder extends Configurable { - - /** - * Get a coder option value. - * @param option - * @return option value - */ - public Object getCoderOption(CoderOption option); - - /** - * Set a coder option value. - * @param option - * @param value - */ - public void setCoderOption(CoderOption option, Object value); - - /** - * The number of data input units for the coding. A unit can be a byte, - * chunk or buffer or even a block. - * @return count of data input units - */ - public int getNumDataUnits(); - - /** - * The number of parity output units for the coding. A unit can be a byte, - * chunk, buffer or even a block. - * @return count of parity output units - */ - public int getNumParityUnits(); - - /** - * Should be called when release this coder. Good chance to release encoding - * or decoding buffers - */ - public void release(); -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java index 8a12106c99..6d94f00d8c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * Raw erasure coder factory that can be used to create raw encoder and decoder. @@ -29,17 +30,15 @@ public interface RawErasureCoderFactory { /** * Create raw erasure encoder. - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group + * @param conf the configuration used to create the encoder * @return raw erasure encoder */ - public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits); + RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions); /** * Create raw erasure decoder. - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group + * @param conf the configuration used to create the encoder * @return raw erasure decoder */ - public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits); + RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java index 17076505c8..a29b47286f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java @@ -19,18 +19,34 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import java.nio.ByteBuffer; /** - * RawErasureDecoder performs decoding given chunks of input data and generates - * missing data that corresponds to an erasure code scheme, like XOR and - * Reed-Solomon. + * An abstract raw erasure decoder that's to be inherited by new decoders. * - * It extends the {@link RawErasureCoder} interface. + * Raw erasure coder is part of erasure codec framework, where erasure coder is + * used to encode/decode a group of blocks (BlockGroup) according to the codec + * specific BlockGroup layout and logic. An erasure coder extracts chunks of + * data from the blocks and can employ various low level raw erasure coders to + * perform encoding/decoding against the chunks. + * + * To distinguish from erasure coder, here raw erasure coder is used to mean the + * low level constructs, since it only takes care of the math calculation with + * a group of byte buffers. + * + * Note it mainly provides decode() calls, which should be stateless and may be + * made thread-safe in future. */ @InterfaceAudience.Private -public interface RawErasureDecoder extends RawErasureCoder { +public abstract class RawErasureDecoder { + + private final ErasureCoderOptions coderOptions; + + public RawErasureDecoder(ErasureCoderOptions coderOptions) { + this.coderOptions = coderOptions; + } /** * Decode with inputs and erasedIndexes, generates outputs. @@ -64,8 +80,44 @@ public interface RawErasureDecoder extends RawErasureCoder { * @param outputs output buffers to put decoded data into according to * erasedIndexes, ready for read after the call */ - void decode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs); + public void decode(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs) { + ByteBufferDecodingState decodingState = new ByteBufferDecodingState(this, + inputs, erasedIndexes, outputs); + + boolean usingDirectBuffer = decodingState.usingDirectBuffer; + int dataLen = decodingState.decodeLength; + if (dataLen == 0) { + return; + } + + int[] inputPositions = new int[inputs.length]; + for (int i = 0; i < inputPositions.length; i++) { + if (inputs[i] != null) { + inputPositions[i] = inputs[i].position(); + } + } + + if (usingDirectBuffer) { + doDecode(decodingState); + } else { + ByteArrayDecodingState badState = decodingState.convertToByteArrayState(); + doDecode(badState); + } + + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] != null) { + // dataLen bytes consumed + inputs[i].position(inputPositions[i] + dataLen); + } + } + } + + /** + * Perform the real decoding using Direct ByteBuffer. + * @param decodingState the decoding state + */ + protected abstract void doDecode(ByteBufferDecodingState decodingState); /** * Decode with inputs and erasedIndexes, generates outputs. More see above. @@ -75,7 +127,23 @@ public interface RawErasureDecoder extends RawErasureCoder { * @param outputs output buffers to put decoded data into according to * erasedIndexes, ready for read after the call */ - void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs); + public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) { + ByteArrayDecodingState decodingState = new ByteArrayDecodingState(this, + inputs, erasedIndexes, outputs); + + if (decodingState.decodeLength == 0) { + return; + } + + doDecode(decodingState); + } + + /** + * Perform the real decoding using bytes array, supporting offsets and + * lengths. + * @param decodingState the decoding state + */ + protected abstract void doDecode(ByteArrayDecodingState decodingState); /** * Decode with inputs and erasedIndexes, generates outputs. More see above. @@ -88,6 +156,57 @@ public interface RawErasureDecoder extends RawErasureCoder { * @param outputs output buffers to put decoded data into according to * erasedIndexes, ready for read after the call */ - void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs); + public void decode(ECChunk[] inputs, int[] erasedIndexes, + ECChunk[] outputs) { + ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs); + ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs); + decode(newInputs, erasedIndexes, newOutputs); + } + public int getNumDataUnits() { + return coderOptions.getNumDataUnits(); + } + + public int getNumParityUnits() { + return coderOptions.getNumParityUnits(); + } + + protected int getNumAllUnits() { + return coderOptions.getNumAllUnits(); + } + + /** + * Tell if direct buffer is preferred or not. It's for callers to + * decide how to allocate coding chunk buffers, using DirectByteBuffer or + * bytes array. It will return false by default. + * @return true if native buffer is preferred for performance consideration, + * otherwise false. + */ + public boolean preferDirectBuffer() { + return false; + } + + /** + * Allow change into input buffers or not while perform encoding/decoding. + * @return true if it's allowed to change inputs, false otherwise + */ + public boolean allowChangeInputs() { + return coderOptions.allowChangeInputs(); + } + + /** + * Allow to dump verbose info during encoding/decoding. + * @return true if it's allowed to do verbose dump, false otherwise. + */ + public boolean allowVerboseDump() { + return coderOptions.allowVerboseDump(); + } + + /** + * Should be called when release this coder. Good chance to release encoding + * or decoding buffers + */ + public void release() { + // Nothing to do here. + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java index 6303d82167..36d68f48fb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java @@ -19,18 +19,34 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import java.nio.ByteBuffer; /** - * RawErasureEncoder performs encoding given chunks of input data and generates - * parity outputs that corresponds to an erasure code scheme, like XOR and - * Reed-Solomon. + * An abstract raw erasure encoder that's to be inherited by new encoders. * - * It extends the {@link RawErasureCoder} interface. + * Raw erasure coder is part of erasure codec framework, where erasure coder is + * used to encode/decode a group of blocks (BlockGroup) according to the codec + * specific BlockGroup layout and logic. An erasure coder extracts chunks of + * data from the blocks and can employ various low level raw erasure coders to + * perform encoding/decoding against the chunks. + * + * To distinguish from erasure coder, here raw erasure coder is used to mean the + * low level constructs, since it only takes care of the math calculation with + * a group of byte buffers. + * + * Note it mainly provides encode() calls, which should be stateless and may be + * made thread-safe in future. */ @InterfaceAudience.Private -public interface RawErasureEncoder extends RawErasureCoder { +public abstract class RawErasureEncoder { + + private final ErasureCoderOptions coderOptions; + + public RawErasureEncoder(ErasureCoderOptions coderOptions) { + this.coderOptions = coderOptions; + } /** * Encode with inputs and generates outputs. @@ -47,7 +63,43 @@ public interface RawErasureEncoder extends RawErasureCoder { * @param outputs output buffers to put the encoded data into, ready to read * after the call */ - void encode(ByteBuffer[] inputs, ByteBuffer[] outputs); + public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) { + ByteBufferEncodingState bbeState = new ByteBufferEncodingState( + this, inputs, outputs); + + boolean usingDirectBuffer = bbeState.usingDirectBuffer; + int dataLen = bbeState.encodeLength; + if (dataLen == 0) { + return; + } + + int[] inputPositions = new int[inputs.length]; + for (int i = 0; i < inputPositions.length; i++) { + if (inputs[i] != null) { + inputPositions[i] = inputs[i].position(); + } + } + + if (usingDirectBuffer) { + doEncode(bbeState); + } else { + ByteArrayEncodingState baeState = bbeState.convertToByteArrayState(); + doEncode(baeState); + } + + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] != null) { + // dataLen bytes consumed + inputs[i].position(inputPositions[i] + dataLen); + } + } + } + + /** + * Perform the real encoding work using direct ByteBuffer. + * @param encodingState the encoding state + */ + protected abstract void doEncode(ByteBufferEncodingState encodingState); /** * Encode with inputs and generates outputs. More see above. @@ -56,7 +108,24 @@ public interface RawErasureEncoder extends RawErasureCoder { * @param outputs output buffers to put the encoded data into, read to read * after the call */ - void encode(byte[][] inputs, byte[][] outputs); + public void encode(byte[][] inputs, byte[][] outputs) { + ByteArrayEncodingState baeState = new ByteArrayEncodingState( + this, inputs, outputs); + + int dataLen = baeState.encodeLength; + if (dataLen == 0) { + return; + } + + doEncode(baeState); + } + + /** + * Perform the real encoding work using bytes array, supporting offsets + * and lengths. + * @param encodingState the encoding state + */ + protected abstract void doEncode(ByteArrayEncodingState encodingState); /** * Encode with inputs and generates outputs. More see above. @@ -65,6 +134,56 @@ public interface RawErasureEncoder extends RawErasureCoder { * @param outputs output buffers to put the encoded data into, read to read * after the call */ - void encode(ECChunk[] inputs, ECChunk[] outputs); + public void encode(ECChunk[] inputs, ECChunk[] outputs) { + ByteBuffer[] newInputs = ECChunk.toBuffers(inputs); + ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs); + encode(newInputs, newOutputs); + } + public int getNumDataUnits() { + return coderOptions.getNumDataUnits(); + } + + public int getNumParityUnits() { + return coderOptions.getNumParityUnits(); + } + + public int getNumAllUnits() { + return coderOptions.getNumAllUnits(); + } + + /** + * Tell if direct buffer is preferred or not. It's for callers to + * decide how to allocate coding chunk buffers, using DirectByteBuffer or + * bytes array. It will return false by default. + * @return true if native buffer is preferred for performance consideration, + * otherwise false. + */ + public boolean preferDirectBuffer() { + return false; + } + + /** + * Allow change into input buffers or not while perform encoding/decoding. + * @return true if it's allowed to change inputs, false otherwise + */ + public boolean allowChangeInputs() { + return coderOptions.allowChangeInputs(); + } + + /** + * Allow to dump verbose info during encoding/decoding. + * @return true if it's allowed to do verbose dump, false otherwise. + */ + public boolean allowVerboseDump() { + return coderOptions.allowVerboseDump(); + } + + /** + * Should be called when release this coder. Good chance to release encoding + * or decoding buffers + */ + public void release() { + // Nothing to do here. + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java index 61017dd8ad..ef7b17237d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java @@ -17,9 +17,10 @@ */ package org.apache.hadoop.io.erasurecode.rawcoder; -import java.nio.ByteBuffer; - import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; + +import java.nio.ByteBuffer; /** * A raw decoder in XOR code scheme in pure Java, adapted from HDFS-RAID. @@ -29,55 +30,57 @@ import org.apache.hadoop.classification.InterfaceAudience; * deployed independently. */ @InterfaceAudience.Private -public class XORRawDecoder extends AbstractRawErasureDecoder { +public class XORRawDecoder extends RawErasureDecoder { - public XORRawDecoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); + public XORRawDecoder(ErasureCoderOptions coderOptions) { + super(coderOptions); } @Override - protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs) { - ByteBuffer output = outputs[0]; + protected void doDecode(ByteBufferDecodingState decodingState) { + CoderUtil.resetOutputBuffers(decodingState.outputs, + decodingState.decodeLength); + ByteBuffer output = decodingState.outputs[0]; - int erasedIdx = erasedIndexes[0]; + int erasedIdx = decodingState.erasedIndexes[0]; // Process the inputs. int iIdx, oIdx; - for (int i = 0; i < inputs.length; i++) { + for (int i = 0; i < decodingState.inputs.length; i++) { // Skip the erased location. if (i == erasedIdx) { continue; } - for (iIdx = inputs[i].position(), oIdx = output.position(); - iIdx < inputs[i].limit(); + for (iIdx = decodingState.inputs[i].position(), oIdx = output.position(); + iIdx < decodingState.inputs[i].limit(); iIdx++, oIdx++) { - output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx))); + output.put(oIdx, (byte) (output.get(oIdx) ^ + decodingState.inputs[i].get(iIdx))); } } } @Override - protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen, - int[] erasedIndexes, byte[][] outputs, - int[] outputOffsets) { - byte[] output = outputs[0]; - resetBuffer(output, outputOffsets[0], dataLen); - - int erasedIdx = erasedIndexes[0]; + protected void doDecode(ByteArrayDecodingState decodingState) { + byte[] output = decodingState.outputs[0]; + int dataLen = decodingState.decodeLength; + CoderUtil.resetOutputBuffers(decodingState.outputs, + decodingState.outputOffsets, dataLen); + int erasedIdx = decodingState.erasedIndexes[0]; // Process the inputs. int iIdx, oIdx; - for (int i = 0; i < inputs.length; i++) { + for (int i = 0; i < decodingState.inputs.length; i++) { // Skip the erased location. if (i == erasedIdx) { continue; } - for (iIdx = inputOffsets[i], oIdx = outputOffsets[0]; - iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) { - output[oIdx] ^= inputs[i][iIdx]; + for (iIdx = decodingState.inputOffsets[i], + oIdx = decodingState.outputOffsets[0]; + iIdx < decodingState.inputOffsets[i] + dataLen; iIdx++, oIdx++) { + output[oIdx] ^= decodingState.inputs[i][iIdx]; } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java index 646fc17739..409ba9dc5f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java @@ -17,9 +17,10 @@ */ package org.apache.hadoop.io.erasurecode.rawcoder; -import java.nio.ByteBuffer; - import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; + +import java.nio.ByteBuffer; /** * A raw encoder in XOR code scheme in pure Java, adapted from HDFS-RAID. @@ -29,50 +30,56 @@ import org.apache.hadoop.classification.InterfaceAudience; * deployed independently. */ @InterfaceAudience.Private -public class XORRawEncoder extends AbstractRawErasureEncoder { +public class XORRawEncoder extends RawErasureEncoder { - public XORRawEncoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); + public XORRawEncoder(ErasureCoderOptions coderOptions) { + super(coderOptions); } - protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { - ByteBuffer output = outputs[0]; + protected void doEncode(ByteBufferEncodingState encodingState) { + CoderUtil.resetOutputBuffers(encodingState.outputs, + encodingState.encodeLength); + ByteBuffer output = encodingState.outputs[0]; // Get the first buffer's data. int iIdx, oIdx; - for (iIdx = inputs[0].position(), oIdx = output.position(); - iIdx < inputs[0].limit(); iIdx++, oIdx++) { - output.put(oIdx, inputs[0].get(iIdx)); + for (iIdx = encodingState.inputs[0].position(), oIdx = output.position(); + iIdx < encodingState.inputs[0].limit(); iIdx++, oIdx++) { + output.put(oIdx, encodingState.inputs[0].get(iIdx)); } // XOR with everything else. - for (int i = 1; i < inputs.length; i++) { - for (iIdx = inputs[i].position(), oIdx = output.position(); - iIdx < inputs[i].limit(); + for (int i = 1; i < encodingState.inputs.length; i++) { + for (iIdx = encodingState.inputs[i].position(), oIdx = output.position(); + iIdx < encodingState.inputs[i].limit(); iIdx++, oIdx++) { - output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx))); + output.put(oIdx, (byte) (output.get(oIdx) ^ + encodingState.inputs[i].get(iIdx))); } } } @Override - protected void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen, - byte[][] outputs, int[] outputOffsets) { - byte[] output = outputs[0]; - resetBuffer(output, outputOffsets[0], dataLen); + protected void doEncode(ByteArrayEncodingState encodingState) { + int dataLen = encodingState.encodeLength; + CoderUtil.resetOutputBuffers(encodingState.outputs, + encodingState.outputOffsets, dataLen); + byte[] output = encodingState.outputs[0]; // Get the first buffer's data. int iIdx, oIdx; - for (iIdx = inputOffsets[0], oIdx = outputOffsets[0]; - iIdx < inputOffsets[0] + dataLen; iIdx++, oIdx++) { - output[oIdx] = inputs[0][iIdx]; + for (iIdx = encodingState.inputOffsets[0], + oIdx = encodingState.outputOffsets[0]; + iIdx < encodingState.inputOffsets[0] + dataLen; iIdx++, oIdx++) { + output[oIdx] = encodingState.inputs[0][iIdx]; } // XOR with everything else. - for (int i = 1; i < inputs.length; i++) { - for (iIdx = inputOffsets[i], oIdx = outputOffsets[0]; - iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) { - output[oIdx] ^= inputs[i][iIdx]; + for (int i = 1; i < encodingState.inputs.length; i++) { + for (iIdx = encodingState.inputOffsets[i], + oIdx = encodingState.outputOffsets[0]; + iIdx < encodingState.inputOffsets[i] + dataLen; iIdx++, oIdx++) { + output[oIdx] ^= encodingState.inputs[i][iIdx]; } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawErasureCoderFactory.java index 312369034d..571fe12245 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawErasureCoderFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawErasureCoderFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * A raw coder factory for raw XOR coder. @@ -26,12 +27,12 @@ import org.apache.hadoop.classification.InterfaceAudience; public class XORRawErasureCoderFactory implements RawErasureCoderFactory { @Override - public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) { - return new XORRawEncoder(numDataUnits, numParityUnits); + public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { + return new XORRawEncoder(coderOptions); } @Override - public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) { - return new XORRawDecoder(numDataUnits, numParityUnits); + public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { + return new XORRawDecoder(coderOptions); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/package-info.java similarity index 53% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/package-info.java index e4d97ca394..034cdf2e70 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/package-info.java @@ -15,29 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.io.erasurecode.rawcoder; /** - * Supported erasure coder options. + * + * Raw erasure coders. + * + * Raw erasure coder is part of erasure codec framework, where erasure coder is + * used to encode/decode a group of blocks (BlockGroup) according to the codec + * specific BlockGroup layout and logic. An erasure coder extracts chunks of + * data from the blocks and can employ various low level raw erasure coders to + * perform encoding/decoding against the chunks. + * + * To distinguish from erasure coder, here raw erasure coder is used to mean the + * low level constructs, since it only takes care of the math calculation with + * a group of byte buffers. */ -public enum CoderOption { - /* If direct buffer is preferred, for perf consideration */ - PREFER_DIRECT_BUFFER(true), // READ-ONLY - /** - * Allow changing input buffer content (not positions). - * Maybe better perf if allowed - */ - ALLOW_CHANGE_INPUTS(false), // READ-WRITE - /* Allow dump verbose debug info or not */ - ALLOW_VERBOSE_DUMP(false); // READ-WRITE +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.io.erasurecode.rawcoder; - private boolean isReadOnly = false; - - CoderOption(boolean isReadOnly) { - this.isReadOnly = isReadOnly; - } - - public boolean isReadOnly() { - return isReadOnly; - } -}; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/CoderUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/CoderUtil.java deleted file mode 100644 index 07d15bed9d..0000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/CoderUtil.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.erasurecode.rawcoder.util; - -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; - -import java.util.Arrays; - -/** - * Helpful utilities for implementing some raw erasure coders. - */ -@InterfaceAudience.Private -public final class CoderUtil { - - private CoderUtil() { - // No called - } - - - /** - * Get indexes into inputs array for items marked as null, either erased or - * not to read. - * @return indexes into inputs array - */ - public static int[] getErasedOrNotToReadIndexes(T[] inputs) { - int[] invalidIndexes = new int[inputs.length]; - int idx = 0; - for (int i = 0; i < inputs.length; i++) { - if (inputs[i] == null) { - invalidIndexes[idx++] = i; - } - } - - return Arrays.copyOf(invalidIndexes, idx); - } - - /** - * Find the valid input from all the inputs. - * @param inputs input buffers to look for valid input - * @return the first valid input - */ - public static T findFirstValidInput(T[] inputs) { - for (T input : inputs) { - if (input != null) { - return input; - } - } - - throw new HadoopIllegalArgumentException( - "Invalid inputs are found, all being null"); - } - - /** - * Picking up indexes of valid inputs. - * @param inputs actually decoding input buffers - * @param validIndexes an array to be filled and returned - * @param - */ - public static void makeValidIndexes(T[] inputs, int[] validIndexes) { - int idx = 0; - for (int i = 0; i < inputs.length && idx < validIndexes.length; i++) { - if (inputs[i] != null) { - validIndexes[idx++] = i; - } - } - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java index fdb47be9c9..96a6408823 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.io.erasurecode.rawcoder.util; +import org.apache.hadoop.classification.InterfaceAudience; + import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import org.apache.hadoop.classification.InterfaceAudience; - /** * Implementation of Galois field arithmetic with 2^p elements. The input must * be unsigned integers. It's ported from HDFS-RAID, slightly adapted. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCodecRawCoderMapping.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCodecRawCoderMapping.java index 5075966221..81dc45817c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCodecRawCoderMapping.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCodecRawCoderMapping.java @@ -46,37 +46,42 @@ public class TestCodecRawCoderMapping { @Test public void testRSDefaultRawCoder() { + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + numDataUnit, numParityUnit); // should return default raw coder of rs-default codec - RawErasureEncoder encoder = CodecUtil.createRSRawEncoder( - conf, numDataUnit, numParityUnit); + RawErasureEncoder encoder = CodecUtil.createRawEncoder( + conf, ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); Assert.assertTrue(encoder instanceof RSRawEncoder); - RawErasureDecoder decoder = CodecUtil.createRSRawDecoder( - conf, numDataUnit, numParityUnit); + RawErasureDecoder decoder = CodecUtil.createRawDecoder( + conf, ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); Assert.assertTrue(decoder instanceof RSRawDecoder); // should return default raw coder of rs-legacy codec - encoder = CodecUtil.createRSRawEncoder(conf, numDataUnit, numParityUnit, - ErasureCodeConstants.RS_LEGACY_CODEC_NAME); + encoder = CodecUtil.createRawEncoder(conf, + ErasureCodeConstants.RS_LEGACY_CODEC_NAME, coderOptions); Assert.assertTrue(encoder instanceof RSRawEncoderLegacy); - decoder = CodecUtil.createRSRawDecoder(conf, numDataUnit, numParityUnit, - ErasureCodeConstants.RS_LEGACY_CODEC_NAME); + decoder = CodecUtil.createRawDecoder(conf, + ErasureCodeConstants.RS_LEGACY_CODEC_NAME, coderOptions); Assert.assertTrue(decoder instanceof RSRawDecoderLegacy); } @Test public void testDedicatedRawCoderKey() { + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + numDataUnit, numParityUnit); + String dummyFactName = "DummyNoneExistingFactory"; // set the dummy factory to rs-legacy and create a raw coder // with rs-default, which is OK as the raw coder key is not used conf.set(CommonConfigurationKeys. IO_ERASURECODE_CODEC_RS_LEGACY_RAWCODER_KEY, dummyFactName); - RawErasureEncoder encoder = CodecUtil.createRSRawEncoder(conf, numDataUnit, - numParityUnit, ErasureCodeConstants.RS_DEFAULT_CODEC_NAME); + RawErasureEncoder encoder = CodecUtil.createRawEncoder(conf, + ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); Assert.assertTrue(encoder instanceof RSRawEncoder); // now create the raw coder with rs-legacy, which should throw exception try { - CodecUtil.createRSRawEncoder(conf, numDataUnit, numParityUnit, - ErasureCodeConstants.RS_LEGACY_CODEC_NAME); + CodecUtil.createRawEncoder(conf, + ErasureCodeConstants.RS_LEGACY_CODEC_NAME, coderOptions); Assert.fail(); } catch (Exception e) { GenericTestUtils.assertExceptionContains("Failed to create raw coder", e); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java index 633e064176..6d14de8a52 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java @@ -35,7 +35,7 @@ import static org.junit.Assert.assertTrue; public abstract class TestCoderBase { protected static Random RAND = new Random(); - private boolean allowDump = true; + protected boolean allowDump = true; private Configuration conf; protected int numDataUnits; @@ -90,13 +90,8 @@ public abstract class TestCoderBase { } } - /** - * Set true during setup if want to dump test settings and coding data, - * useful in debugging. - * @param allowDump - */ - protected void setAllowDump(boolean allowDump) { - this.allowDump = allowDump; + protected boolean isAllowDump() { + return allowDump; } /** @@ -502,7 +497,8 @@ public abstract class TestCoderBase { sb.append(" erasedParityIndexes="). append(Arrays.toString(erasedParityIndexes)); sb.append(" usingDirectBuffer=").append(usingDirectBuffer); - sb.append(" isAllowingChangeInputs=").append(allowChangeInputs); + sb.append(" allowChangeInputs=").append(allowChangeInputs); + sb.append(" allowVerboseDump=").append(allowDump); sb.append("\n"); System.out.println(sb.toString()); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDummyRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDummyRawCoder.java index 63a2ac873f..5be9b4e9fe 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDummyRawCoder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDummyRawCoder.java @@ -50,7 +50,7 @@ public class TestDummyRawCoder extends TestRawCoderBase { @Override protected void testCoding(boolean usingDirectBuffer) { this.usingDirectBuffer = usingDirectBuffer; - prepareCoders(); + prepareCoders(true); prepareBufferAllocator(true); setAllowChangeInputs(false); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java index cf77539325..32f0e00c8d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java @@ -18,6 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.TestCoderBase; import org.junit.Assert; import org.junit.Test; @@ -62,7 +63,7 @@ public abstract class TestRawCoderBase extends TestCoderBase { */ protected void testCoding(boolean usingDirectBuffer) { this.usingDirectBuffer = usingDirectBuffer; - prepareCoders(); + prepareCoders(true); /** * The following runs will use 3 different chunkSize for inputs and outputs, @@ -79,7 +80,7 @@ public abstract class TestRawCoderBase extends TestCoderBase { */ protected void testCodingWithBadInput(boolean usingDirectBuffer) { this.usingDirectBuffer = usingDirectBuffer; - prepareCoders(); + prepareCoders(true); try { performTestCoding(baseChunkSize, false, true, false, true); @@ -95,7 +96,7 @@ public abstract class TestRawCoderBase extends TestCoderBase { */ protected void testCodingWithBadOutput(boolean usingDirectBuffer) { this.usingDirectBuffer = usingDirectBuffer; - prepareCoders(); + prepareCoders(true); try { performTestCoding(baseChunkSize, false, false, true, true); @@ -189,16 +190,23 @@ public abstract class TestRawCoderBase extends TestCoderBase { protected void setAllowChangeInputs(boolean allowChangeInputs) { this.allowChangeInputs = allowChangeInputs; - encoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, allowChangeInputs); - decoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, allowChangeInputs); } - protected void prepareCoders() { - if (encoder == null) { + /** + * Set true during setup if want to dump test settings and coding data, + * useful in debugging. + * @param allowDump + */ + protected void setAllowDump(boolean allowDump) { + this.allowDump = allowDump; + } + + protected void prepareCoders(boolean recreate) { + if (encoder == null || recreate) { encoder = createEncoder(); } - if (decoder == null) { + if (decoder == null || recreate) { decoder = createDecoder(); } } @@ -222,18 +230,16 @@ public abstract class TestRawCoderBase extends TestCoderBase { * @return */ protected RawErasureEncoder createEncoder() { - RawErasureEncoder encoder; + ErasureCoderOptions coderConf = + new ErasureCoderOptions(numDataUnits, numParityUnits, + allowChangeInputs, allowDump); try { Constructor constructor = - (Constructor) - encoderClass.getConstructor(int.class, int.class); - encoder = constructor.newInstance(numDataUnits, numParityUnits); + encoderClass.getConstructor(ErasureCoderOptions.class); + return constructor.newInstance(coderConf); } catch (Exception e) { throw new RuntimeException("Failed to create encoder", e); } - - encoder.setConf(getConf()); - return encoder; } /** @@ -241,18 +247,16 @@ public abstract class TestRawCoderBase extends TestCoderBase { * @return */ protected RawErasureDecoder createDecoder() { - RawErasureDecoder decoder; + ErasureCoderOptions coderConf = + new ErasureCoderOptions(numDataUnits, numParityUnits, + allowChangeInputs, allowDump); try { Constructor constructor = - (Constructor) - decoderClass.getConstructor(int.class, int.class); - decoder = constructor.newInstance(numDataUnits, numParityUnits); + decoderClass.getConstructor(ErasureCoderOptions.class); + return constructor.newInstance(coderConf); } catch (Exception e) { throw new RuntimeException("Failed to create decoder", e); } - - decoder.setConf(getConf()); - return decoder; } /** @@ -261,7 +265,7 @@ public abstract class TestRawCoderBase extends TestCoderBase { */ protected void testInputPosition(boolean usingDirectBuffer) { this.usingDirectBuffer = usingDirectBuffer; - prepareCoders(); + prepareCoders(true); prepareBufferAllocator(false); // verify encode diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 1944782a15..1bdbc32dd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -38,6 +38,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResu import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.util.DirectBufferPool; @@ -184,8 +185,10 @@ public class DFSStripedInputStream extends DFSInputStream { curStripeRange = new StripeRange(0, 0); readingService = new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); - decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(), - dataBlkNum, parityBlkNum, ecPolicy.getCodecName()); + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + dataBlkNum, parityBlkNum); + decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(), + ecPolicy.getCodecName(), coderOptions); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Creating an striped input stream for file " + src); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 403e50fa94..85dc74925e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; @@ -286,8 +287,10 @@ public class DFSStripedOutputStream extends DFSOutputStream { flushAllExecutorCompletionService = new ExecutorCompletionService<>(flushAllExecutor); - encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(), - numDataBlocks, numParityBlocks, ecPolicy.getCodecName()); + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + numDataBlocks, numParityBlocks); + encoder = CodecUtil.createRawEncoder(dfsClient.getConfiguration(), + ecPolicy.getCodecName(), coderOptions); coordinator = new Coordinator(numAllBlocks); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index c80bf96143..47a69794a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.DataChecksum; @@ -215,8 +216,10 @@ class StripedReconstructor implements Runnable { // Initialize decoder private void initDecoderIfNecessary() { if (decoder == null) { - decoder = CodecUtil.createRSRawDecoder(conf, ecPolicy.getNumDataUnits(), - ecPolicy.getNumParityUnits(), ecPolicy.getCodecName()); + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()); + decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(), + coderOptions); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 6dcccc3cdc..4c3afdd7a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.junit.Assert; @@ -491,9 +492,12 @@ public class StripedFileTestUtil { System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length); } } + + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + dataBytes.length, parityBytes.length); final RawErasureEncoder encoder = - CodecUtil.createRSRawEncoder(conf, dataBytes.length, parityBytes.length, - TEST_EC_POLICY.getCodecName()); + CodecUtil.createRawEncoder(conf, TEST_EC_POLICY.getCodecName(), + coderOptions); encoder.encode(dataBytes, expectedParityBytes); for (int i = 0; i < parityBytes.length; i++) { if (checkSet.contains(i + dataBytes.length)){ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index e4f7ac049c..a02a8d699c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -20,30 +20,25 @@ package org.apache.hadoop.hdfs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.CodecUtil; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; import java.io.IOException; @@ -51,6 +46,12 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestDFSStripedInputStream { public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class); @@ -217,8 +218,10 @@ public class TestDFSStripedInputStream { } } - RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf, - DATA_BLK_NUM, PARITY_BLK_NUM, ecPolicy.getCodecName()); + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + DATA_BLK_NUM, PARITY_BLK_NUM); + RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(conf, + ecPolicy.getCodecName(), coderOptions); // Update the expected content for decoded data int[] missingBlkIdx = new int[PARITY_BLK_NUM];