HADOOP-19180. EC: Fix calculation errors caused by special index order (#6813). Contributed by zhengchenyu.

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Signed-off-by: Shuyan Zhang <zhangshuyan@apache.org>
This commit is contained in:
zhengchenyu 2024-08-19 12:40:45 +08:00 committed by GitHub
parent 59dba6e1bd
commit e5b76dc99f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 195 additions and 62 deletions

View File

@ -51,7 +51,6 @@ public class RSRawDecoder extends RawErasureDecoder {
private byte[] gfTables;
private int[] cachedErasedIndexes;
private int[] validIndexes;
private int numErasedDataUnits;
private boolean[] erasureFlags;
public RSRawDecoder(ErasureCoderOptions coderOptions) {
@ -120,14 +119,10 @@ private void processErasures(int[] erasedIndexes) {
this.gfTables = new byte[getNumAllUnits() * getNumDataUnits() * 32];
this.erasureFlags = new boolean[getNumAllUnits()];
this.numErasedDataUnits = 0;
for (int i = 0; i < erasedIndexes.length; i++) {
int index = erasedIndexes[i];
erasureFlags[index] = true;
if (index < getNumDataUnits()) {
numErasedDataUnits++;
}
}
generateDecodeMatrix(erasedIndexes);
@ -156,14 +151,14 @@ private void generateDecodeMatrix(int[] erasedIndexes) {
GF256.gfInvertMatrix(tmpMatrix, invertMatrix, getNumDataUnits());
for (i = 0; i < numErasedDataUnits; i++) {
for (p = 0; p < erasedIndexes.length; p++) {
int erasedIndex = erasedIndexes[p];
if (erasedIndex < getNumDataUnits()) {
for (j = 0; j < getNumDataUnits(); j++) {
decodeMatrix[getNumDataUnits() * i + j] =
invertMatrix[getNumDataUnits() * erasedIndexes[i] + j];
decodeMatrix[getNumDataUnits() * p + j] =
invertMatrix[getNumDataUnits() * erasedIndexes[p] + j];
}
}
for (p = numErasedDataUnits; p < erasedIndexes.length; p++) {
} else {
for (i = 0; i < getNumDataUnits(); i++) {
s = 0;
for (j = 0; j < getNumDataUnits(); j++) {
@ -174,4 +169,5 @@ private void generateDecodeMatrix(int[] erasedIndexes) {
}
}
}
}
}

View File

@ -132,9 +132,6 @@ static int processErasures(IsalDecoder* pCoder, unsigned char** inputs,
index = erasedIndexes[i];
pCoder->erasedIndexes[i] = index;
pCoder->erasureFlags[index] = 1;
if (index < numDataUnits) {
pCoder->numErasedDataUnits++;
}
}
pCoder->numErased = numErased;
@ -175,7 +172,6 @@ int decode(IsalDecoder* pCoder, unsigned char** inputs,
// Clear variables used per decode call
void clearDecoder(IsalDecoder* decoder) {
decoder->numErasedDataUnits = 0;
decoder->numErased = 0;
memset(decoder->gftbls, 0, sizeof(decoder->gftbls));
memset(decoder->decodeMatrix, 0, sizeof(decoder->decodeMatrix));
@ -205,15 +201,14 @@ int generateDecodeMatrix(IsalDecoder* pCoder) {
h_gf_invert_matrix(pCoder->tmpMatrix,
pCoder->invertMatrix, numDataUnits);
for (i = 0; i < pCoder->numErasedDataUnits; i++) {
for (p = 0; p < pCoder->numErased; p++) {
for (j = 0; j < numDataUnits; j++) {
pCoder->decodeMatrix[numDataUnits * i + j] =
int erasedIndex = pCoder->erasedIndexes[p];
if (erasedIndex < numDataUnits) {
pCoder->decodeMatrix[numDataUnits * p + j] =
pCoder->invertMatrix[numDataUnits *
pCoder->erasedIndexes[i] + j];
}
}
for (p = pCoder->numErasedDataUnits; p < pCoder->numErased; p++) {
pCoder->erasedIndexes[p] + j];
} else {
for (i = 0; i < numDataUnits; i++) {
s = 0;
for (j = 0; j < numDataUnits; j++) {
@ -221,10 +216,11 @@ int generateDecodeMatrix(IsalDecoder* pCoder) {
pCoder->encodeMatrix[numDataUnits *
pCoder->erasedIndexes[p] + j]);
}
pCoder->decodeMatrix[numDataUnits * p + i] = s;
}
}
}
}
return 0;
}

View File

@ -62,7 +62,6 @@ typedef struct _IsalDecoder {
unsigned char erasureFlags[MMAX];
int erasedIndexes[MMAX];
int numErased;
int numErasedDataUnits;
unsigned char* realInputs[MMAX];
} IsalDecoder;

View File

@ -27,25 +27,27 @@
#include "erasure_code.h"
#include "gf_util.h"
#include "erasure_coder.h"
#include "dump.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main(int argc, char *argv[]) {
int i, j;
int i, j, k, l;
char err[256];
size_t err_len = sizeof(err);
int chunkSize = 1024;
int numDataUnits = 6;
int numParityUnits = 3;
int numTotalUnits = numDataUnits + numParityUnits;
unsigned char** dataUnits;
unsigned char** parityUnits;
IsalEncoder* pEncoder;
int erasedIndexes[2];
int erasedIndexes[3];
unsigned char* allUnits[MMAX];
IsalDecoder* pDecoder;
unsigned char* decodingOutput[2];
unsigned char* decodingOutput[3];
unsigned char** backupUnits;
if (0 == build_support_erasurecode()) {
@ -82,6 +84,11 @@ int main(int argc, char *argv[]) {
}
}
// Allocate decode output
for (i = 0; i < 3; i++) {
decodingOutput[i] = malloc(chunkSize);
}
pEncoder = (IsalEncoder*)malloc(sizeof(IsalEncoder));
memset(pEncoder, 0, sizeof(*pEncoder));
initEncoder(pEncoder, numDataUnits, numParityUnits);
@ -95,27 +102,54 @@ int main(int argc, char *argv[]) {
memcpy(allUnits + numDataUnits, parityUnits,
numParityUnits * (sizeof (unsigned char*)));
erasedIndexes[0] = 1;
erasedIndexes[1] = 7;
backupUnits[0] = allUnits[1];
backupUnits[1] = allUnits[7];
allUnits[0] = NULL; // Not to read
allUnits[1] = NULL;
allUnits[7] = NULL;
decodingOutput[0] = malloc(chunkSize);
decodingOutput[1] = malloc(chunkSize);
decode(pDecoder, allUnits, erasedIndexes, 2, decodingOutput, chunkSize);
for (i = 0; i < pDecoder->numErased; i++) {
if (0 != memcmp(decodingOutput[i], backupUnits[i], chunkSize)) {
fprintf(stderr, "Decoding failed\n\n");
for (i = 0; i < numTotalUnits; i++) {
for (j = 0; j < numTotalUnits; j++) {
for (k = 0; k < numTotalUnits; k++) {
int numErased;
if (i == j && j == k) {
erasedIndexes[0] = i;
numErased = 1;
backupUnits[0] = allUnits[i];
allUnits[i] = NULL;
} else if (i == j) {
erasedIndexes[0] = i;
erasedIndexes[1] = k;
numErased = 2;
backupUnits[0] = allUnits[i];
backupUnits[1] = allUnits[k];
allUnits[i] = NULL;
allUnits[k] = NULL;
} else if (i == k || j == k) {
erasedIndexes[0] = i;
erasedIndexes[1] = j;
numErased = 2;
backupUnits[0] = allUnits[i];
backupUnits[1] = allUnits[j];
allUnits[i] = NULL;
allUnits[j] = NULL;
} else {
erasedIndexes[0] = i;
erasedIndexes[1] = j;
erasedIndexes[2] = k;
numErased = 3;
backupUnits[0] = allUnits[i];
backupUnits[1] = allUnits[j];
backupUnits[2] = allUnits[k];
allUnits[i] = NULL;
allUnits[j] = NULL;
allUnits[k] = NULL;
}
decode(pDecoder, allUnits, erasedIndexes, numErased, decodingOutput, chunkSize);
for (l = 0; l < pDecoder->numErased; l++) {
if (0 != memcmp(decodingOutput[l], backupUnits[l], chunkSize)) {
printf("Decoding failed\n");
dumpDecoder(pDecoder);
return -1;
}
allUnits[erasedIndexes[l]] = backupUnits[l];
}
}
}
}
dumpDecoder(pDecoder);

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.junit.Test;
import java.util.Random;
import static org.junit.Assert.assertArrayEquals;
public class TestErasureCodingEncodeAndDecode {
private final static int CHUNCK = 1024;
private final static int DATAB_LOCKS = 6;
private final static int PARITY_BLOCKS = 3;
private final static int TOTAL_BLOCKS = DATAB_LOCKS + PARITY_BLOCKS;
@Test
public void testEncodeAndDecode() throws Exception {
Configuration conf = new Configuration();
int totalBytes = CHUNCK * DATAB_LOCKS;
Random random = new Random();
byte[] tmpBytes = new byte[totalBytes];
random.nextBytes(tmpBytes);
byte[][] data = new byte[DATAB_LOCKS][CHUNCK];
for (int i = 0; i < DATAB_LOCKS; i++) {
System.arraycopy(tmpBytes, i * CHUNCK, data[i], 0, CHUNCK);
}
ErasureCoderOptions coderOptions = new ErasureCoderOptions(DATAB_LOCKS, PARITY_BLOCKS);
// 1 Encode
RawErasureEncoder encoder =
CodecUtil.createRawEncoder(conf, ErasureCodeConstants.RS_CODEC_NAME, coderOptions);
byte[][] parity = new byte[PARITY_BLOCKS][CHUNCK];
encoder.encode(data, parity);
// 2 Compose the complete data
byte[][] all = new byte[DATAB_LOCKS + PARITY_BLOCKS][CHUNCK];
for (int i = 0; i < DATAB_LOCKS; i++) {
System.arraycopy(data[i], 0, all[i], 0, CHUNCK);
}
for (int i = 0; i < PARITY_BLOCKS; i++) {
System.arraycopy(parity[i], 0, all[i + DATAB_LOCKS], 0, CHUNCK);
}
// 3 Decode
RawErasureDecoder rawDecoder =
CodecUtil.createRawDecoder(conf, ErasureCodeConstants.RS_CODEC_NAME, coderOptions);
byte[][] backup = new byte[PARITY_BLOCKS][CHUNCK];
for (int i = 0; i < TOTAL_BLOCKS; i++) {
for (int j = 0; j < TOTAL_BLOCKS; j++) {
for (int k = 0; k < TOTAL_BLOCKS; k++) {
int[] erasedIndexes;
if (i == j && j == k) {
erasedIndexes = new int[]{i};
backup[0] = all[i];
all[i] = null;
} else if (i == j) {
erasedIndexes = new int[]{i, k};
backup[0] = all[i];
backup[1] = all[k];
all[i] = null;
all[k] = null;
} else if ((i == k) || ((j == k))) {
erasedIndexes = new int[]{i, j};
backup[0] = all[i];
backup[1] = all[j];
all[i] = null;
all[j] = null;
} else {
erasedIndexes = new int[]{i, j, k};
backup[0] = all[i];
backup[1] = all[j];
backup[2] = all[k];
all[i] = null;
all[j] = null;
all[k] = null;
}
byte[][] decoded = new byte[erasedIndexes.length][CHUNCK];
rawDecoder.decode(all, erasedIndexes, decoded);
for (int l = 0; l < erasedIndexes.length; l++) {
assertArrayEquals(backup[l], decoded[l]);
all[erasedIndexes[l]] = backup[l];
}
}
}
}
}
}