HADOOP-11541. Raw XOR coder

This commit is contained in:
Kai Zheng 2015-02-08 01:40:27 +08:00 committed by Zhe Zhang
parent 9f19eb9fcf
commit b29f3bde4d
6 changed files with 556 additions and 1 deletions

View File

@ -5,3 +5,6 @@
HADOOP-11534. Minor improvements for raw erasure coders
( Kai Zheng via vinayakumarb )
HADOOP-11541. Raw XOR coder
( Kai Zheng )

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import java.nio.ByteBuffer;
/**
* A raw decoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
*/
public class XorRawDecoder extends AbstractRawErasureDecoder {
@Override
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
assert(erasedIndexes.length == outputs.length);
assert(erasedIndexes.length <= 1);
int bufSize = inputs[0].remaining();
int erasedIdx = erasedIndexes[0];
// Set the output to zeros.
for (int j = 0; j < bufSize; j++) {
outputs[0].put(j, (byte) 0);
}
// Process the inputs.
for (int i = 0; i < inputs.length; i++) {
// Skip the erased location.
if (i == erasedIdx) {
continue;
}
for (int j = 0; j < bufSize; j++) {
outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j)));
}
}
}
@Override
protected void doDecode(byte[][] inputs, int[] erasedIndexes,
byte[][] outputs) {
assert(erasedIndexes.length == outputs.length);
assert(erasedIndexes.length <= 1);
int bufSize = inputs[0].length;
int erasedIdx = erasedIndexes[0];
// Set the output to zeros.
for (int j = 0; j < bufSize; j++) {
outputs[0][j] = 0;
}
// Process the inputs.
for (int i = 0; i < inputs.length; i++) {
// Skip the erased location.
if (i == erasedIdx) {
continue;
}
for (int j = 0; j < bufSize; j++) {
outputs[0][j] ^= inputs[i][j];
}
}
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import java.nio.ByteBuffer;
/**
* A raw encoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
*/
public class XorRawEncoder extends AbstractRawErasureEncoder {
@Override
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
int bufSize = inputs[0].remaining();
// Get the first buffer's data.
for (int j = 0; j < bufSize; j++) {
outputs[0].put(j, inputs[0].get(j));
}
// XOR with everything else.
for (int i = 1; i < inputs.length; i++) {
for (int j = 0; j < bufSize; j++) {
outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j)));
}
}
}
@Override
protected void doEncode(byte[][] inputs, byte[][] outputs) {
int bufSize = inputs[0].length;
// Get the first buffer's data.
for (int j = 0; j < bufSize; j++) {
outputs[0][j] = inputs[0][j];
}
// XOR with everything else.
for (int i = 1; i < inputs.length; i++) {
for (int j = 0; j < bufSize; j++) {
outputs[0][j] ^= inputs[i][j];
}
}
}
}

View File

@ -0,0 +1,262 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode;
import java.nio.ByteBuffer;
import java.util.Random;
import static org.junit.Assert.assertArrayEquals;
/**
* Test base of common utilities for tests not only raw coders but also block
* coders.
*/
public abstract class TestCoderBase {
protected static Random RAND = new Random();
protected int numDataUnits;
protected int numParityUnits;
protected int chunkSize = 16 * 1024;
// Indexes of erased data units. Will also support test of erasing
// parity units
protected int[] erasedDataIndexes = new int[] {0};
// Data buffers are either direct or on-heap, for performance the two cases
// may go to different coding implementations.
protected boolean usingDirectBuffer = true;
/**
* Compare and verify if erased chunks are equal to recovered chunks
* @param erasedChunks
* @param recoveredChunks
*/
protected void compareAndVerify(ECChunk[] erasedChunks,
ECChunk[] recoveredChunks) {
byte[][] erased = ECChunk.toArray(erasedChunks);
byte[][] recovered = ECChunk.toArray(recoveredChunks);
for (int i = 0; i < erasedChunks.length; ++i) {
assertArrayEquals("Decoding and comparing failed.", erased[i],
recovered[i]);
}
}
/**
* Adjust and return erased indexes based on the array of the input chunks (
* parity chunks + data chunks).
* @return
*/
protected int[] getErasedIndexesForDecoding() {
int[] erasedIndexesForDecoding = new int[erasedDataIndexes.length];
for (int i = 0; i < erasedDataIndexes.length; ++i) {
erasedIndexesForDecoding[i] = erasedDataIndexes[i] + numParityUnits;
}
return erasedIndexesForDecoding;
}
/**
* Return input chunks for decoding, which is parityChunks + dataChunks.
* @param dataChunks
* @param parityChunks
* @return
*/
protected ECChunk[] prepareInputChunksForDecoding(ECChunk[] dataChunks,
ECChunk[] parityChunks) {
ECChunk[] inputChunks = new ECChunk[numParityUnits + numDataUnits];
int idx = 0;
for (int i = 0; i < numParityUnits; i++) {
inputChunks[idx ++] = parityChunks[i];
}
for (int i = 0; i < numDataUnits; i++) {
inputChunks[idx ++] = dataChunks[i];
}
return inputChunks;
}
/**
* Have a copy of the data chunks that's to be erased thereafter. The copy
* will be used to compare and verify with the to be recovered chunks.
* @param dataChunks
* @return
*/
protected ECChunk[] copyDataChunksToErase(ECChunk[] dataChunks) {
ECChunk[] copiedChunks = new ECChunk[erasedDataIndexes.length];
int j = 0;
for (int i = 0; i < erasedDataIndexes.length; ++i) {
copiedChunks[j ++] = cloneChunkWithData(dataChunks[erasedDataIndexes[i]]);
}
return copiedChunks;
}
/**
* Erase some data chunks to test the recovering of them
* @param dataChunks
*/
protected void eraseSomeDataBlocks(ECChunk[] dataChunks) {
for (int i = 0; i < erasedDataIndexes.length; ++i) {
eraseDataFromChunk(dataChunks[erasedDataIndexes[i]]);
}
}
/**
* Erase data from the specified chunks, putting ZERO bytes to the buffers.
* @param chunks
*/
protected void eraseDataFromChunks(ECChunk[] chunks) {
for (int i = 0; i < chunks.length; ++i) {
eraseDataFromChunk(chunks[i]);
}
}
/**
* Erase data from the specified chunk, putting ZERO bytes to the buffer.
* @param chunk
*/
protected void eraseDataFromChunk(ECChunk chunk) {
ByteBuffer chunkBuffer = chunk.getBuffer();
// erase the data
chunkBuffer.position(0);
for (int i = 0; i < chunkSize; ++i) {
chunkBuffer.put((byte) 0);
}
chunkBuffer.flip();
}
/**
* Clone chunks along with copying the associated data. It respects how the
* chunk buffer is allocated, direct or non-direct. It avoids affecting the
* original chunk buffers.
* @param chunks
* @return
*/
protected static ECChunk[] cloneChunksWithData(ECChunk[] chunks) {
ECChunk[] results = new ECChunk[chunks.length];
for (int i = 0; i < chunks.length; ++i) {
results[i] = cloneChunkWithData(chunks[i]);
}
return results;
}
/**
* Clone chunk along with copying the associated data. It respects how the
* chunk buffer is allocated, direct or non-direct. It avoids affecting the
* original chunk.
* @param chunk
* @return a new chunk
*/
protected static ECChunk cloneChunkWithData(ECChunk chunk) {
ByteBuffer srcBuffer = chunk.getBuffer();
ByteBuffer destBuffer;
byte[] bytesArr = new byte[srcBuffer.remaining()];
srcBuffer.mark();
srcBuffer.get(bytesArr);
srcBuffer.reset();
if (srcBuffer.hasArray()) {
destBuffer = ByteBuffer.wrap(bytesArr);
} else {
destBuffer = ByteBuffer.allocateDirect(srcBuffer.remaining());
destBuffer.put(bytesArr);
destBuffer.flip();
}
return new ECChunk(destBuffer);
}
/**
* Allocate a chunk for output or writing.
* @return
*/
protected ECChunk allocateOutputChunk() {
ByteBuffer buffer = allocateOutputBuffer();
return new ECChunk(buffer);
}
/**
* Allocate a buffer for output or writing.
* @return
*/
protected ByteBuffer allocateOutputBuffer() {
ByteBuffer buffer = usingDirectBuffer ?
ByteBuffer.allocateDirect(chunkSize) : ByteBuffer.allocate(chunkSize);
return buffer;
}
/**
* Prepare data chunks for each data unit, by generating random data.
* @return
*/
protected ECChunk[] prepareDataChunksForEncoding() {
ECChunk[] chunks = new ECChunk[numDataUnits];
for (int i = 0; i < chunks.length; i++) {
chunks[i] = generateDataChunk();
}
return chunks;
}
/**
* Generate data chunk by making random data.
* @return
*/
protected ECChunk generateDataChunk() {
ByteBuffer buffer = allocateOutputBuffer();
for (int i = 0; i < chunkSize; i++) {
buffer.put((byte) RAND.nextInt(256));
}
buffer.flip();
return new ECChunk(buffer);
}
/**
* Prepare parity chunks for encoding, each chunk for each parity unit.
* @return
*/
protected ECChunk[] prepareParityChunksForEncoding() {
ECChunk[] chunks = new ECChunk[numParityUnits];
for (int i = 0; i < chunks.length; i++) {
chunks[i] = allocateOutputChunk();
}
return chunks;
}
/**
* Prepare output chunks for decoding, each output chunk for each erased
* chunk.
* @return
*/
protected ECChunk[] prepareOutputChunksForDecoding() {
ECChunk[] chunks = new ECChunk[erasedDataIndexes.length];
for (int i = 0; i < chunks.length; i++) {
chunks[i] = allocateOutputChunk();
}
return chunks;
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.TestCoderBase;
/**
* Raw coder test base with utilities.
*/
public abstract class TestRawCoderBase extends TestCoderBase {
protected Class<? extends RawErasureEncoder> encoderClass;
protected Class<? extends RawErasureDecoder> decoderClass;
/**
* Generating source data, encoding, recovering and then verifying.
* RawErasureCoder mainly uses ECChunk to pass input and output data buffers,
* it supports two kinds of ByteBuffers, one is array backed, the other is
* direct ByteBuffer. Have usingDirectBuffer to indicate which case to test.
* @param usingDirectBuffer
*/
protected void testCoding(boolean usingDirectBuffer) {
// Generate data and encode
ECChunk[] dataChunks = prepareDataChunksForEncoding();
ECChunk[] parityChunks = prepareParityChunksForEncoding();
RawErasureEncoder encoder = createEncoder();
// Backup all the source chunks for later recovering because some coders
// may affect the source data.
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
// Make a copy of a strip for later comparing
ECChunk[] toEraseDataChunks = copyDataChunksToErase(clonedDataChunks);
encoder.encode(dataChunks, parityChunks);
// Erase the copied sources
eraseSomeDataBlocks(clonedDataChunks);
//Decode
ECChunk[] inputChunks = prepareInputChunksForDecoding(clonedDataChunks,
parityChunks);
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
RawErasureDecoder decoder = createDecoder();
decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
//Compare
compareAndVerify(toEraseDataChunks, recoveredChunks);
}
/**
* Create the raw erasure encoder to test
* @return
*/
protected RawErasureEncoder createEncoder() {
RawErasureEncoder encoder;
try {
encoder = encoderClass.newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed to create encoder", e);
}
encoder.initialize(numDataUnits, numParityUnits, chunkSize);
return encoder;
}
/**
* create the raw erasure decoder to test
* @return
*/
protected RawErasureDecoder createDecoder() {
RawErasureDecoder decoder;
try {
decoder = decoderClass.newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed to create decoder", e);
}
decoder.initialize(numDataUnits, numParityUnits, chunkSize);
return decoder;
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.junit.Before;
import org.junit.Test;
import java.util.Random;
/**
* Test XOR encoding and decoding.
*/
public class TestXorRawCoder extends TestRawCoderBase {
private static Random RAND = new Random();
@Before
public void setup() {
this.encoderClass = XorRawEncoder.class;
this.decoderClass = XorRawDecoder.class;
this.numDataUnits = 10;
this.numParityUnits = 1;
this.erasedDataIndexes = new int[] {0};
}
@Test
public void testCodingNoDirectBuffer() {
testCoding(false);
}
@Test
public void testCodingDirectBuffer() {
testCoding(true);
}
}