diff --git a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt index 505eabdbcd..9ccd3a7d34 100644 --- a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt @@ -66,4 +66,6 @@ HADOOP-12011. Allow to dump verbose information to ease debugging in raw erasure coders (Kai Zheng) - HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng) \ No newline at end of file + HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng) + + HDFS-8557. Allow to configure RS and XOR raw coders (Kai Zheng) \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 3f2871b023..9588254ed5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -143,10 +143,14 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Supported erasure codec classes */ public static final String IO_ERASURECODE_CODECS_KEY = "io.erasurecode.codecs"; - /** Raw coder factory for the RS codec */ + /** Raw coder factory for the RS codec. */ public static final String IO_ERASURECODE_CODEC_RS_RAWCODER_KEY = "io.erasurecode.codec.rs.rawcoder"; + /** Raw coder factory for the XOR codec. */ + public static final String IO_ERASURECODE_CODEC_XOR_RAWCODER_KEY = + "io.erasurecode.codec.xor.rawcoder"; + /** * Service Authorization */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java new file mode 100644 index 0000000000..5d226248ef --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.erasurecode.rawcoder.*; + +/** + * A codec utility. + */ +public final class CodecUtil { + + private CodecUtil() {} + + /** + * Create RS raw encoder according to configuration. + * @param conf + * @param numDataUnits + * @param numParityUnits + * @return raw encoder + */ + public static RawErasureEncoder createRSRawEncoder( + Configuration conf, int numDataUnits, int numParityUnits) { + RawErasureCoder rawCoder = createRawCoder(conf, + CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, + true, numDataUnits, numParityUnits); + if (rawCoder == null) { + rawCoder = new RSRawEncoder(numDataUnits, numParityUnits); + } + + return (RawErasureEncoder) rawCoder; + } + + /** + * Create RS raw decoder according to configuration. + * @param conf + * @param numDataUnits + * @param numParityUnits + * @return raw decoder + */ + public static RawErasureDecoder createRSRawDecoder( + Configuration conf, int numDataUnits, int numParityUnits) { + RawErasureCoder rawCoder = createRawCoder(conf, + CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, + false, numDataUnits, numParityUnits); + if (rawCoder == null) { + rawCoder = new RSRawDecoder(numDataUnits, numParityUnits); + } + + return (RawErasureDecoder) rawCoder; + } + + /** + * Create XOR raw encoder according to configuration. + * @param conf + * @param numDataUnits + * @param numParityUnits + * @return raw encoder + */ + public static RawErasureEncoder createXORRawEncoder( + Configuration conf, int numDataUnits, int numParityUnits) { + RawErasureCoder rawCoder = createRawCoder(conf, + CommonConfigurationKeys.IO_ERASURECODE_CODEC_XOR_RAWCODER_KEY, + true, numDataUnits, numParityUnits); + if (rawCoder == null) { + rawCoder = new XORRawEncoder(numDataUnits, numParityUnits); + } + + return (RawErasureEncoder) rawCoder; + } + + /** + * Create XOR raw decoder according to configuration. + * @param conf + * @param numDataUnits + * @param numParityUnits + * @return raw decoder + */ + public static RawErasureDecoder createXORRawDecoder( + Configuration conf, int numDataUnits, int numParityUnits) { + RawErasureCoder rawCoder = createRawCoder(conf, + CommonConfigurationKeys.IO_ERASURECODE_CODEC_XOR_RAWCODER_KEY, + false, numDataUnits, numParityUnits); + if (rawCoder == null) { + rawCoder = new XORRawDecoder(numDataUnits, numParityUnits); + } + + return (RawErasureDecoder) rawCoder; + } + + /** + * Create raw coder using specified conf and raw coder factory key. + * @param conf + * @param rawCoderFactoryKey + * @param isEncoder + * @param numDataUnits + * @param numParityUnits + * @return raw coder + */ + public static RawErasureCoder createRawCoder(Configuration conf, + String rawCoderFactoryKey, boolean isEncoder, int numDataUnits, + int numParityUnits) { + + if (conf == null) { + return null; + } + + Class factClass = null; + factClass = conf.getClass(rawCoderFactoryKey, + factClass, RawErasureCoderFactory.class); + + if (factClass == null) { + return null; + } + + RawErasureCoderFactory fact; + try { + fact = factClass.newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException("Failed to create raw coder", e); + } catch (IllegalAccessException e) { + throw new RuntimeException("Failed to create raw coder", e); + } + + return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) : + fact.createDecoder(numDataUnits, numParityUnits); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java index c572badef2..5cd0ee8c18 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java @@ -17,13 +17,8 @@ */ package org.apache.hadoop.io.erasurecode.coder; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** * A common class of basic facilities to be shared by encoder and decoder @@ -36,73 +31,13 @@ public abstract class AbstractErasureCoder private final int numDataUnits; private final int numParityUnits; - /** - * Create raw decoder using the factory specified by rawCoderFactoryKey - * @param rawCoderFactoryKey - * @return raw decoder - */ - protected RawErasureDecoder createRawDecoder( - String rawCoderFactoryKey, int dataUnitsCount, int parityUnitsCount) { - RawErasureCoder rawCoder = createRawCoder(getConf(), - rawCoderFactoryKey, false, dataUnitsCount, parityUnitsCount); - return (RawErasureDecoder) rawCoder; - } - - /** - * Create raw encoder using the factory specified by rawCoderFactoryKey - * @param rawCoderFactoryKey - * @return raw encoder - */ - protected RawErasureEncoder createRawEncoder( - String rawCoderFactoryKey, int dataUnitsCount, int parityUnitsCount) { - RawErasureCoder rawCoder = createRawCoder(getConf(), - rawCoderFactoryKey, true, dataUnitsCount, parityUnitsCount); - return (RawErasureEncoder) rawCoder; - } - - /** - * Create raw coder using specified conf and raw coder factory key. - * @param conf - * @param rawCoderFactoryKey - * @param isEncoder - * @return raw coder - */ - public static RawErasureCoder createRawCoder(Configuration conf, - String rawCoderFactoryKey, boolean isEncoder, int numDataUnits, - int numParityUnits) { - - if (conf == null) { - return null; - } - - Class factClass = null; - factClass = conf.getClass(rawCoderFactoryKey, - factClass, RawErasureCoderFactory.class); - - if (factClass == null) { - return null; - } - - RawErasureCoderFactory fact; - try { - fact = factClass.newInstance(); - } catch (InstantiationException e) { - throw new RuntimeException("Failed to create raw coder", e); - } catch (IllegalAccessException e) { - throw new RuntimeException("Failed to create raw coder", e); - } - - return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) : - fact.createDecoder(numDataUnits, numParityUnits); - } - public AbstractErasureCoder(int numDataUnits, int numParityUnits) { this.numDataUnits = numDataUnits; this.numParityUnits = numParityUnits; } public AbstractErasureCoder(ECSchema schema) { - this(schema.getNumDataUnits(), schema.getNumParityUnits()); + this(schema.getNumDataUnits(), schema.getNumParityUnits()); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java index 57f4373ac1..f56674d32c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java @@ -17,11 +17,10 @@ */ package org.apache.hadoop.io.erasurecode.coder; -import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; /** @@ -53,12 +52,8 @@ protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) { private RawErasureDecoder checkCreateRSRawDecoder() { if (rsRawDecoder == null) { - rsRawDecoder = createRawDecoder( - CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, - getNumDataUnits(), getNumParityUnits()); - if (rsRawDecoder == null) { - rsRawDecoder = new RSRawDecoder(getNumDataUnits(), getNumParityUnits()); - } + rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(), + getNumDataUnits(), getNumParityUnits()); } return rsRawDecoder; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java index ab23474f54..3ed3e2091d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java @@ -17,11 +17,10 @@ */ package org.apache.hadoop.io.erasurecode.coder; -import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** @@ -53,12 +52,8 @@ protected ErasureCodingStep prepareEncodingStep(final ECBlockGroup blockGroup) { private RawErasureEncoder checkCreateRSRawEncoder() { if (rawEncoder == null) { - rawEncoder = createRawEncoder( - CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, + rawEncoder = CodecUtil.createRSRawEncoder(getConf(), getNumDataUnits(), getNumParityUnits()); - if (rawEncoder == null) { - rawEncoder = new RSRawEncoder(getNumDataUnits(), getNumParityUnits()); - } } return rawEncoder; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java index 3fe8d1bf82..a84741824e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java @@ -17,11 +17,11 @@ */ package org.apache.hadoop.io.erasurecode.coder; +import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; -import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder; /** * Xor erasure decoder that decodes a block group. @@ -39,10 +39,10 @@ public XORErasureDecoder(ECSchema schema) { } @Override - protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) { - // May be configured - RawErasureDecoder rawDecoder = new XORRawDecoder( - getNumDataUnits(), getNumParityUnits()); + protected ErasureCodingStep prepareDecodingStep( + final ECBlockGroup blockGroup) { + RawErasureDecoder rawDecoder = CodecUtil.createXORRawDecoder(getConf(), + getNumDataUnits(), getNumParityUnits()); ECBlock[] inputBlocks = getInputBlocks(blockGroup); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java index 5020896b66..5c4bcddb51 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java @@ -17,11 +17,11 @@ */ package org.apache.hadoop.io.erasurecode.coder; +import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; -import org.apache.hadoop.io.erasurecode.rawcoder.XORRawEncoder; /** * Xor erasure encoder that encodes a block group. @@ -39,10 +39,10 @@ public XORErasureEncoder(ECSchema schema) { } @Override - protected ErasureCodingStep prepareEncodingStep(final ECBlockGroup blockGroup) { - // May be configured - RawErasureEncoder rawEncoder = new XORRawEncoder( - getNumDataUnits(), getNumParityUnits()); + protected ErasureCodingStep prepareEncodingStep( + final ECBlockGroup blockGroup) { + RawErasureEncoder rawEncoder = CodecUtil.createXORRawEncoder(getConf(), + getNumDataUnits(), getNumParityUnits()); ECBlock[] inputBlocks = getInputBlocks(blockGroup); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index bf99f17d7f..a7339b7d16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -42,6 +42,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; +import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; @@ -155,7 +156,8 @@ boolean include(long pos) { curStripeRange = new StripeRange(0, 0); readingService = new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); - decoder = new RSRawDecoder(dataBlkNum, parityBlkNum); + decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(), + dataBlkNum, parityBlkNum); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Creating an striped input stream for file " + src); } @@ -207,8 +209,8 @@ private synchronized void blockSeekTo(long target) throws IOException { // The purpose is to get start offset into each block. long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema, cellSize, targetBlockGroup, offsetIntoBlockGroup); - Preconditions.checkState( - offsetsForInternalBlocks.length == dataBlkNum + parityBlkNum); + Preconditions.checkState(offsetsForInternalBlocks.length == + dataBlkNum + parityBlkNum); long minOffset = offsetsForInternalBlocks[dataBlkNum]; retry = new ReaderRetryPolicy(); @@ -726,8 +728,10 @@ void prepareDecodeInputs() { void prepareParityChunk() { for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { if (alignedStripe.chunks[i] == null) { - final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); - alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); + final int decodeIndex = convertIndex4Decode(i, + dataBlkNum, parityBlkNum); + alignedStripe.chunks[i] = + new StripingChunk(decodeInputs[decodeIndex]); alignedStripe.chunks[i].addByteArraySlice(0, (int) alignedStripe.getSpanInBlock()); break; @@ -807,7 +811,8 @@ void prepareParityChunk() throws IOException { parityBlkNum); decodeInputs[decodeIndex] = ByteBuffer.allocateDirect( (int) alignedStripe.range.spanInBlock); - alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); + alignedStripe.chunks[i] = + new StripingChunk(decodeInputs[decodeIndex]); if (blockReaders[i] == null) { prepareParityBlockReader(i); } @@ -839,7 +844,8 @@ void decode() { // decoders to work final int span = (int) alignedStripe.getSpanInBlock(); for (int i = 0; i < alignedStripe.chunks.length; i++) { - final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + final int decodeIndex = convertIndex4Decode(i, + dataBlkNum, parityBlkNum); if (alignedStripe.chunks[i] != null && alignedStripe.chunks[i].state == StripingChunk.ALLZERO) { for (int j = 0; j < span; j++) { @@ -857,7 +863,8 @@ void decode() { for (int i = 0; i < alignedStripe.chunks.length; i++) { if (alignedStripe.chunks[i] != null && alignedStripe.chunks[i].state == StripingChunk.MISSING) { - decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + decodeIndices[pos++] = convertIndex4Decode(i, + dataBlkNum, parityBlkNum); } } decodeIndices = Arrays.copyOf(decodeIndices, pos); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 0935d5c634..bdd3352179 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; @@ -247,13 +248,16 @@ ExtendedBlock getBlock() { numDataBlocks = schema.getNumDataUnits(); numAllBlocks = numDataBlocks + numParityBlocks; - encoder = new RSRawEncoder(numDataBlocks, numParityBlocks); + encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(), + numDataBlocks, numParityBlocks); - coordinator = new Coordinator(dfsClient.getConf(), numDataBlocks, numAllBlocks); + coordinator = new Coordinator(dfsClient.getConf(), + numDataBlocks, numAllBlocks); try { cellBuffers = new CellBuffers(numParityBlocks); } catch (InterruptedException ie) { - throw DFSUtil.toInterruptedIOException("Failed to create cell buffers", ie); + throw DFSUtil.toInterruptedIOException( + "Failed to create cell buffers", ie); } List s = new ArrayList<>(numAllBlocks); @@ -318,7 +322,8 @@ private void checkStreamers() throws IOException { } } - private void handleStreamerFailure(String err, Exception e) throws IOException { + private void handleStreamerFailure(String err, + Exception e) throws IOException { LOG.warn("Failed: " + err + ", " + this, e); getCurrentStreamer().setIsFailed(true); checkStreamers(); @@ -487,7 +492,8 @@ private void writeParityCellsForLastStripe() throws IOException { return; } - final int firstCellSize = (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize); + final int firstCellSize = + (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize); final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize? firstCellSize : cellSize; final ByteBuffer[] buffers = cellBuffers.getBuffers(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 6f3857feee..3c9adc449b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; @@ -112,7 +113,7 @@ public ErasureCodingWorker(Configuration conf, DataNode datanode) { } private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { - return new RSRawDecoder(numDataUnits, numParityUnits); + return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits); } private void initializeStripedReadThreadPool(int num) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index b53983bc5a..b29d582384 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -37,8 +37,10 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -230,8 +232,9 @@ public void testPreadWithDNFailure() throws Exception { for (int m : missingBlkIdx) { decodeInputs[m] = null; } - RSRawDecoder rsRawDecoder = new RSRawDecoder(DATA_BLK_NUM, PARITY_BLK_NUM); - rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs); + RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf, + DATA_BLK_NUM, PARITY_BLK_NUM); + rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs); int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE; System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index e041dbe303..3f40deeae2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.test.GenericTestUtils; @@ -43,7 +44,8 @@ import org.junit.Test; public class TestDFSStripedOutputStream { - public static final Log LOG = LogFactory.getLog(TestDFSStripedOutputStream.class); + public static final Log LOG = LogFactory.getLog( + TestDFSStripedOutputStream.class); static { GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); @@ -55,6 +57,7 @@ public class TestDFSStripedOutputStream { private MiniDFSCluster cluster; private DistributedFileSystem fs; + private Configuration conf; private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final int stripesPerBlock = 4; private final int blockSize = cellSize * stripesPerBlock; @@ -62,7 +65,7 @@ public class TestDFSStripedOutputStream { @Before public void setup() throws IOException { int numDNs = dataBlocks + parityBlocks + 2; - Configuration conf = new Configuration(); + conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().createErasureCodingZone("/", null, 0); @@ -140,7 +143,8 @@ public void testFileMoreThanABlockGroup1() throws IOException { @Test public void testFileMoreThanABlockGroup2() throws IOException { - testOneFile("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123); + testOneFile("/MoreThanABlockGroup2", + blockSize * dataBlocks + cellSize+ 123); } @@ -251,13 +255,14 @@ void checkData(String src, int writeBytes) throws IOException { } } - static void verifyParity(final long size, final int cellSize, + void verifyParity(final long size, final int cellSize, byte[][] dataBytes, byte[][] parityBytes) { - verifyParity(size, cellSize, dataBytes, parityBytes, -1); + verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1); } - static void verifyParity(final long size, final int cellSize, - byte[][] dataBytes, byte[][] parityBytes, int killedDnIndex) { + static void verifyParity(Configuration conf, final long size, + final int cellSize, byte[][] dataBytes, + byte[][] parityBytes, int killedDnIndex) { // verify the parity blocks int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength( size, cellSize, dataBytes.length, dataBytes.length); @@ -275,7 +280,8 @@ static void verifyParity(final long size, final int cellSize, } } final RawErasureEncoder encoder = - new RSRawEncoder(dataBytes.length, parityBytes.length); + CodecUtil.createRSRawEncoder(conf, + dataBytes.length, parityBytes.length); encoder.encode(dataBytes, expectedParityBytes); for (int i = 0; i < parityBytes.length; i++) { if (i != killedDnIndex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index c232e13610..d2e0458271 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -335,7 +335,7 @@ static void checkData(DistributedFileSystem dfs, String src, int length, } // check parity - TestDFSStripedOutputStream.verifyParity( + TestDFSStripedOutputStream.verifyParity(dfs.getConf(), lbs.getLocatedBlocks().get(group).getBlockSize(), CELL_SIZE, dataBlockBytes, parityBlockBytes, killedDnIndex - dataBlockBytes.length);