diff --git a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt index 2124800e6d..9728f977bb 100644 --- a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt @@ -4,4 +4,7 @@ (Kai Zheng via umamahesh) HADOOP-11534. Minor improvements for raw erasure coders - ( Kai Zheng via vinayakumarb ) \ No newline at end of file + ( Kai Zheng via vinayakumarb ) + + HADOOP-11541. Raw XOR coder + ( Kai Zheng ) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java new file mode 100644 index 0000000000..98307a7b3c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import java.nio.ByteBuffer; + +/** + * A raw decoder in XOR code scheme in pure Java, adapted from HDFS-RAID. + */ +public class XorRawDecoder extends AbstractRawErasureDecoder { + + @Override + protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs) { + assert(erasedIndexes.length == outputs.length); + assert(erasedIndexes.length <= 1); + + int bufSize = inputs[0].remaining(); + int erasedIdx = erasedIndexes[0]; + + // Set the output to zeros. + for (int j = 0; j < bufSize; j++) { + outputs[0].put(j, (byte) 0); + } + + // Process the inputs. + for (int i = 0; i < inputs.length; i++) { + // Skip the erased location. + if (i == erasedIdx) { + continue; + } + + for (int j = 0; j < bufSize; j++) { + outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j))); + } + } + } + + @Override + protected void doDecode(byte[][] inputs, int[] erasedIndexes, + byte[][] outputs) { + assert(erasedIndexes.length == outputs.length); + assert(erasedIndexes.length <= 1); + + int bufSize = inputs[0].length; + int erasedIdx = erasedIndexes[0]; + + // Set the output to zeros. + for (int j = 0; j < bufSize; j++) { + outputs[0][j] = 0; + } + + // Process the inputs. + for (int i = 0; i < inputs.length; i++) { + // Skip the erased location. + if (i == erasedIdx) { + continue; + } + + for (int j = 0; j < bufSize; j++) { + outputs[0][j] ^= inputs[i][j]; + } + } + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java new file mode 100644 index 0000000000..99b20b92e7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import java.nio.ByteBuffer; + +/** + * A raw encoder in XOR code scheme in pure Java, adapted from HDFS-RAID. + */ +public class XorRawEncoder extends AbstractRawErasureEncoder { + + @Override + protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { + int bufSize = inputs[0].remaining(); + + // Get the first buffer's data. + for (int j = 0; j < bufSize; j++) { + outputs[0].put(j, inputs[0].get(j)); + } + + // XOR with everything else. + for (int i = 1; i < inputs.length; i++) { + for (int j = 0; j < bufSize; j++) { + outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j))); + } + } + } + + @Override + protected void doEncode(byte[][] inputs, byte[][] outputs) { + int bufSize = inputs[0].length; + + // Get the first buffer's data. + for (int j = 0; j < bufSize; j++) { + outputs[0][j] = inputs[0][j]; + } + + // XOR with everything else. + for (int i = 1; i < inputs.length; i++) { + for (int j = 0; j < bufSize; j++) { + outputs[0][j] ^= inputs[i][j]; + } + } + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java new file mode 100644 index 0000000000..9482b43482 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode; + +import java.nio.ByteBuffer; +import java.util.Random; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Test base of common utilities for tests not only raw coders but also block + * coders. + */ +public abstract class TestCoderBase { + protected static Random RAND = new Random(); + + protected int numDataUnits; + protected int numParityUnits; + protected int chunkSize = 16 * 1024; + + // Indexes of erased data units. Will also support test of erasing + // parity units + protected int[] erasedDataIndexes = new int[] {0}; + + // Data buffers are either direct or on-heap, for performance the two cases + // may go to different coding implementations. + protected boolean usingDirectBuffer = true; + + /** + * Compare and verify if erased chunks are equal to recovered chunks + * @param erasedChunks + * @param recoveredChunks + */ + protected void compareAndVerify(ECChunk[] erasedChunks, + ECChunk[] recoveredChunks) { + byte[][] erased = ECChunk.toArray(erasedChunks); + byte[][] recovered = ECChunk.toArray(recoveredChunks); + for (int i = 0; i < erasedChunks.length; ++i) { + assertArrayEquals("Decoding and comparing failed.", erased[i], + recovered[i]); + } + } + + /** + * Adjust and return erased indexes based on the array of the input chunks ( + * parity chunks + data chunks). + * @return + */ + protected int[] getErasedIndexesForDecoding() { + int[] erasedIndexesForDecoding = new int[erasedDataIndexes.length]; + for (int i = 0; i < erasedDataIndexes.length; ++i) { + erasedIndexesForDecoding[i] = erasedDataIndexes[i] + numParityUnits; + } + return erasedIndexesForDecoding; + } + + /** + * Return input chunks for decoding, which is parityChunks + dataChunks. + * @param dataChunks + * @param parityChunks + * @return + */ + protected ECChunk[] prepareInputChunksForDecoding(ECChunk[] dataChunks, + ECChunk[] parityChunks) { + ECChunk[] inputChunks = new ECChunk[numParityUnits + numDataUnits]; + + int idx = 0; + for (int i = 0; i < numParityUnits; i++) { + inputChunks[idx ++] = parityChunks[i]; + } + for (int i = 0; i < numDataUnits; i++) { + inputChunks[idx ++] = dataChunks[i]; + } + + return inputChunks; + } + + /** + * Have a copy of the data chunks that's to be erased thereafter. The copy + * will be used to compare and verify with the to be recovered chunks. + * @param dataChunks + * @return + */ + protected ECChunk[] copyDataChunksToErase(ECChunk[] dataChunks) { + ECChunk[] copiedChunks = new ECChunk[erasedDataIndexes.length]; + + int j = 0; + for (int i = 0; i < erasedDataIndexes.length; ++i) { + copiedChunks[j ++] = cloneChunkWithData(dataChunks[erasedDataIndexes[i]]); + } + + return copiedChunks; + } + + /** + * Erase some data chunks to test the recovering of them + * @param dataChunks + */ + protected void eraseSomeDataBlocks(ECChunk[] dataChunks) { + for (int i = 0; i < erasedDataIndexes.length; ++i) { + eraseDataFromChunk(dataChunks[erasedDataIndexes[i]]); + } + } + + /** + * Erase data from the specified chunks, putting ZERO bytes to the buffers. + * @param chunks + */ + protected void eraseDataFromChunks(ECChunk[] chunks) { + for (int i = 0; i < chunks.length; ++i) { + eraseDataFromChunk(chunks[i]); + } + } + + /** + * Erase data from the specified chunk, putting ZERO bytes to the buffer. + * @param chunk + */ + protected void eraseDataFromChunk(ECChunk chunk) { + ByteBuffer chunkBuffer = chunk.getBuffer(); + // erase the data + chunkBuffer.position(0); + for (int i = 0; i < chunkSize; ++i) { + chunkBuffer.put((byte) 0); + } + chunkBuffer.flip(); + } + + /** + * Clone chunks along with copying the associated data. It respects how the + * chunk buffer is allocated, direct or non-direct. It avoids affecting the + * original chunk buffers. + * @param chunks + * @return + */ + protected static ECChunk[] cloneChunksWithData(ECChunk[] chunks) { + ECChunk[] results = new ECChunk[chunks.length]; + for (int i = 0; i < chunks.length; ++i) { + results[i] = cloneChunkWithData(chunks[i]); + } + + return results; + } + + /** + * Clone chunk along with copying the associated data. It respects how the + * chunk buffer is allocated, direct or non-direct. It avoids affecting the + * original chunk. + * @param chunk + * @return a new chunk + */ + protected static ECChunk cloneChunkWithData(ECChunk chunk) { + ByteBuffer srcBuffer = chunk.getBuffer(); + ByteBuffer destBuffer; + + byte[] bytesArr = new byte[srcBuffer.remaining()]; + srcBuffer.mark(); + srcBuffer.get(bytesArr); + srcBuffer.reset(); + + if (srcBuffer.hasArray()) { + destBuffer = ByteBuffer.wrap(bytesArr); + } else { + destBuffer = ByteBuffer.allocateDirect(srcBuffer.remaining()); + destBuffer.put(bytesArr); + destBuffer.flip(); + } + + return new ECChunk(destBuffer); + } + + /** + * Allocate a chunk for output or writing. + * @return + */ + protected ECChunk allocateOutputChunk() { + ByteBuffer buffer = allocateOutputBuffer(); + + return new ECChunk(buffer); + } + + /** + * Allocate a buffer for output or writing. + * @return + */ + protected ByteBuffer allocateOutputBuffer() { + ByteBuffer buffer = usingDirectBuffer ? + ByteBuffer.allocateDirect(chunkSize) : ByteBuffer.allocate(chunkSize); + + return buffer; + } + + /** + * Prepare data chunks for each data unit, by generating random data. + * @return + */ + protected ECChunk[] prepareDataChunksForEncoding() { + ECChunk[] chunks = new ECChunk[numDataUnits]; + for (int i = 0; i < chunks.length; i++) { + chunks[i] = generateDataChunk(); + } + + return chunks; + } + + /** + * Generate data chunk by making random data. + * @return + */ + protected ECChunk generateDataChunk() { + ByteBuffer buffer = allocateOutputBuffer(); + for (int i = 0; i < chunkSize; i++) { + buffer.put((byte) RAND.nextInt(256)); + } + buffer.flip(); + + return new ECChunk(buffer); + } + + /** + * Prepare parity chunks for encoding, each chunk for each parity unit. + * @return + */ + protected ECChunk[] prepareParityChunksForEncoding() { + ECChunk[] chunks = new ECChunk[numParityUnits]; + for (int i = 0; i < chunks.length; i++) { + chunks[i] = allocateOutputChunk(); + } + + return chunks; + } + + /** + * Prepare output chunks for decoding, each output chunk for each erased + * chunk. + * @return + */ + protected ECChunk[] prepareOutputChunksForDecoding() { + ECChunk[] chunks = new ECChunk[erasedDataIndexes.length]; + for (int i = 0; i < chunks.length; i++) { + chunks[i] = allocateOutputChunk(); + } + + return chunks; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java new file mode 100644 index 0000000000..9119211641 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.TestCoderBase; + +/** + * Raw coder test base with utilities. + */ +public abstract class TestRawCoderBase extends TestCoderBase { + protected Class encoderClass; + protected Class decoderClass; + + /** + * Generating source data, encoding, recovering and then verifying. + * RawErasureCoder mainly uses ECChunk to pass input and output data buffers, + * it supports two kinds of ByteBuffers, one is array backed, the other is + * direct ByteBuffer. Have usingDirectBuffer to indicate which case to test. + * @param usingDirectBuffer + */ + protected void testCoding(boolean usingDirectBuffer) { + // Generate data and encode + ECChunk[] dataChunks = prepareDataChunksForEncoding(); + ECChunk[] parityChunks = prepareParityChunksForEncoding(); + RawErasureEncoder encoder = createEncoder(); + + // Backup all the source chunks for later recovering because some coders + // may affect the source data. + ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks); + // Make a copy of a strip for later comparing + ECChunk[] toEraseDataChunks = copyDataChunksToErase(clonedDataChunks); + + encoder.encode(dataChunks, parityChunks); + // Erase the copied sources + eraseSomeDataBlocks(clonedDataChunks); + + //Decode + ECChunk[] inputChunks = prepareInputChunksForDecoding(clonedDataChunks, + parityChunks); + ECChunk[] recoveredChunks = prepareOutputChunksForDecoding(); + RawErasureDecoder decoder = createDecoder(); + decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks); + + //Compare + compareAndVerify(toEraseDataChunks, recoveredChunks); + } + + /** + * Create the raw erasure encoder to test + * @return + */ + protected RawErasureEncoder createEncoder() { + RawErasureEncoder encoder; + try { + encoder = encoderClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed to create encoder", e); + } + + encoder.initialize(numDataUnits, numParityUnits, chunkSize); + return encoder; + } + + /** + * create the raw erasure decoder to test + * @return + */ + protected RawErasureDecoder createDecoder() { + RawErasureDecoder decoder; + try { + decoder = decoderClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed to create decoder", e); + } + + decoder.initialize(numDataUnits, numParityUnits, chunkSize); + return decoder; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java new file mode 100644 index 0000000000..8e59b8a2c3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +/** + * Test XOR encoding and decoding. + */ +public class TestXorRawCoder extends TestRawCoderBase { + private static Random RAND = new Random(); + + @Before + public void setup() { + this.encoderClass = XorRawEncoder.class; + this.decoderClass = XorRawDecoder.class; + + this.numDataUnits = 10; + this.numParityUnits = 1; + + this.erasedDataIndexes = new int[] {0}; + } + + @Test + public void testCodingNoDirectBuffer() { + testCoding(false); + } + + @Test + public void testCodingDirectBuffer() { + testCoding(true); + } + +}