HDFS-15759. EC: Verify EC reconstruction correctness on DataNode (#2585)
(cherry picked from commit 95e6892675
)
This commit is contained in:
parent
7f7535534d
commit
dca2bf9dd5
@ -0,0 +1,187 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.io.erasurecode.rawcoder;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* A utility class to validate decoding.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DecodingValidator {
|
||||
|
||||
private final RawErasureDecoder decoder;
|
||||
private ByteBuffer buffer;
|
||||
private int[] newValidIndexes;
|
||||
private int newErasedIndex;
|
||||
|
||||
public DecodingValidator(RawErasureDecoder decoder) {
|
||||
this.decoder = decoder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate outputs decoded from inputs, by decoding an input back from
|
||||
* the outputs and comparing it with the original one.
|
||||
*
|
||||
* For instance, in RS (6, 3), let (d0, d1, d2, d3, d4, d5) be sources
|
||||
* and (p0, p1, p2) be parities, and assume
|
||||
* inputs = [d0, null (d1), d2, d3, d4, d5, null (p0), p1, null (p2)];
|
||||
* erasedIndexes = [1, 6];
|
||||
* outputs = [d1, p0].
|
||||
* Then
|
||||
* 1. Create new inputs, erasedIndexes and outputs for validation so that
|
||||
* the inputs could contain the decoded outputs, and decode them:
|
||||
* newInputs = [d1, d2, d3, d4, d5, p0]
|
||||
* newErasedIndexes = [0]
|
||||
* newOutputs = [d0']
|
||||
* 2. Compare d0 and d0'. The comparison will fail with high probability
|
||||
* when the initial outputs are wrong.
|
||||
*
|
||||
* Note that the input buffers' positions must be the ones where data are
|
||||
* read: If the input buffers have been processed by a decoder, the buffers'
|
||||
* positions must be reset before being passed into this method.
|
||||
*
|
||||
* This method does not change outputs and erasedIndexes.
|
||||
*
|
||||
* @param inputs input buffers used for decoding. The buffers' position
|
||||
* are moved to the end after this method.
|
||||
* @param erasedIndexes indexes of erased units used for decoding
|
||||
* @param outputs decoded output buffers, which are ready to be read after
|
||||
* the call
|
||||
* @throws IOException
|
||||
*/
|
||||
public void validate(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
ByteBuffer[] outputs) throws IOException {
|
||||
markBuffers(outputs);
|
||||
|
||||
try {
|
||||
ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
|
||||
boolean isDirect = validInput.isDirect();
|
||||
int capacity = validInput.capacity();
|
||||
int remaining = validInput.remaining();
|
||||
|
||||
// Init buffer
|
||||
if (buffer == null || buffer.isDirect() != isDirect
|
||||
|| buffer.capacity() < remaining) {
|
||||
buffer = allocateBuffer(isDirect, capacity);
|
||||
}
|
||||
buffer.clear().limit(remaining);
|
||||
|
||||
// Create newInputs and newErasedIndex for validation
|
||||
ByteBuffer[] newInputs = new ByteBuffer[inputs.length];
|
||||
int count = 0;
|
||||
for (int i = 0; i < erasedIndexes.length; i++) {
|
||||
newInputs[erasedIndexes[i]] = outputs[i];
|
||||
count++;
|
||||
}
|
||||
newErasedIndex = -1;
|
||||
boolean selected = false;
|
||||
int numValidIndexes = CoderUtil.getValidIndexes(inputs).length;
|
||||
for (int i = 0; i < newInputs.length; i++) {
|
||||
if (count == numValidIndexes) {
|
||||
break;
|
||||
} else if (!selected && inputs[i] != null) {
|
||||
newErasedIndex = i;
|
||||
newInputs[i] = null;
|
||||
selected = true;
|
||||
} else if (newInputs[i] == null) {
|
||||
newInputs[i] = inputs[i];
|
||||
if (inputs[i] != null) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Keep it for testing
|
||||
newValidIndexes = CoderUtil.getValidIndexes(newInputs);
|
||||
|
||||
decoder.decode(newInputs, new int[]{newErasedIndex},
|
||||
new ByteBuffer[]{buffer});
|
||||
|
||||
if (!buffer.equals(inputs[newErasedIndex])) {
|
||||
throw new InvalidDecodingException("Failed to validate decoding");
|
||||
}
|
||||
} finally {
|
||||
toLimits(inputs);
|
||||
resetBuffers(outputs);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate outputs decoded from inputs, by decoding an input back from
|
||||
* those outputs and comparing it with the original one.
|
||||
* @param inputs input buffers used for decoding
|
||||
* @param erasedIndexes indexes of erased units used for decoding
|
||||
* @param outputs decoded output buffers
|
||||
* @throws IOException
|
||||
*/
|
||||
public void validate(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs)
|
||||
throws IOException {
|
||||
ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs);
|
||||
ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs);
|
||||
validate(newInputs, erasedIndexes, newOutputs);
|
||||
}
|
||||
|
||||
private ByteBuffer allocateBuffer(boolean direct, int capacity) {
|
||||
if (direct) {
|
||||
buffer = ByteBuffer.allocateDirect(capacity);
|
||||
} else {
|
||||
buffer = ByteBuffer.allocate(capacity);
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private static void markBuffers(ByteBuffer[] buffers) {
|
||||
for (ByteBuffer buffer: buffers) {
|
||||
if (buffer != null) {
|
||||
buffer.mark();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void resetBuffers(ByteBuffer[] buffers) {
|
||||
for (ByteBuffer buffer: buffers) {
|
||||
if (buffer != null) {
|
||||
buffer.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void toLimits(ByteBuffer[] buffers) {
|
||||
for (ByteBuffer buffer: buffers) {
|
||||
if (buffer != null) {
|
||||
buffer.position(buffer.limit());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected int[] getNewValidIndexes() {
|
||||
return newValidIndexes;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected int getNewErasedIndex() {
|
||||
return newErasedIndex;
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.io.erasurecode.rawcoder;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Thrown for invalid decoding.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class InvalidDecodingException
|
||||
extends IOException {
|
||||
private static final long serialVersionUID = 0L;
|
||||
|
||||
public InvalidDecodingException(String description) {
|
||||
super(description);
|
||||
}
|
||||
}
|
@ -450,6 +450,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|
||||
| `BlocksDeletedInPendingIBR` | Number of blocks at deleted status in pending incremental block report (IBR) |
|
||||
| `EcReconstructionTasks` | Total number of erasure coding reconstruction tasks |
|
||||
| `EcFailedReconstructionTasks` | Total number of erasure coding failed reconstruction tasks |
|
||||
| `EcInvalidReconstructionTasks` | Total number of erasure coding invalidated reconstruction tasks |
|
||||
| `EcDecodingTimeNanos` | Total number of nanoseconds spent by decoding tasks |
|
||||
| `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker |
|
||||
| `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker |
|
||||
|
@ -527,4 +527,16 @@ protected void corruptSomeChunk(ECChunk[] chunks) {
|
||||
buffer.position(buffer.position() + 1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Pollute some chunk.
|
||||
* @param chunks
|
||||
*/
|
||||
protected void polluteSomeChunk(ECChunk[] chunks) {
|
||||
int idx = new Random().nextInt(chunks.length);
|
||||
ByteBuffer buffer = chunks[idx].getBuffer();
|
||||
buffer.mark();
|
||||
buffer.put((byte) ((buffer.get(buffer.position()) + 1)));
|
||||
buffer.reset();
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,237 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.io.erasurecode.rawcoder;
|
||||
|
||||
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test {@link DecodingValidator} under various decoders.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestDecodingValidator extends TestRawCoderBase {
|
||||
|
||||
private DecodingValidator validator;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{RSRawErasureCoderFactory.class, 6, 3, new int[]{1}, new int[]{}},
|
||||
{RSRawErasureCoderFactory.class, 6, 3, new int[]{3}, new int[]{0}},
|
||||
{RSRawErasureCoderFactory.class, 6, 3, new int[]{2, 4}, new int[]{1}},
|
||||
{NativeRSRawErasureCoderFactory.class, 6, 3, new int[]{0}, new int[]{}},
|
||||
{XORRawErasureCoderFactory.class, 10, 1, new int[]{0}, new int[]{}},
|
||||
{NativeXORRawErasureCoderFactory.class, 10, 1, new int[]{0},
|
||||
new int[]{}}
|
||||
});
|
||||
}
|
||||
|
||||
public TestDecodingValidator(
|
||||
Class<? extends RawErasureCoderFactory> factoryClass, int numDataUnits,
|
||||
int numParityUnits, int[] erasedDataIndexes, int[] erasedParityIndexes) {
|
||||
this.encoderFactoryClass = factoryClass;
|
||||
this.decoderFactoryClass = factoryClass;
|
||||
this.numDataUnits = numDataUnits;
|
||||
this.numParityUnits = numParityUnits;
|
||||
this.erasedDataIndexes = erasedDataIndexes;
|
||||
this.erasedParityIndexes = erasedParityIndexes;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
if (encoderFactoryClass == NativeRSRawErasureCoderFactory.class
|
||||
|| encoderFactoryClass == NativeXORRawErasureCoderFactory.class) {
|
||||
Assume.assumeTrue(ErasureCodeNative.isNativeCodeLoaded());
|
||||
}
|
||||
setAllowDump(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the same validator can process direct and non-direct buffers.
|
||||
*/
|
||||
@Test
|
||||
public void testValidate() {
|
||||
prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
|
||||
erasedParityIndexes);
|
||||
testValidate(true);
|
||||
testValidate(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the same validator can process variable width of data for
|
||||
* inputs and outputs.
|
||||
*/
|
||||
protected void testValidate(boolean usingDirectBuffer) {
|
||||
this.usingDirectBuffer = usingDirectBuffer;
|
||||
prepareCoders(false);
|
||||
prepareValidator(false);
|
||||
|
||||
performTestValidate(baseChunkSize);
|
||||
performTestValidate(baseChunkSize - 17);
|
||||
performTestValidate(baseChunkSize + 18);
|
||||
}
|
||||
|
||||
protected void prepareValidator(boolean recreate) {
|
||||
if (validator == null || recreate) {
|
||||
validator = new DecodingValidator(decoder);
|
||||
}
|
||||
}
|
||||
|
||||
protected void performTestValidate(int chunkSize) {
|
||||
setChunkSize(chunkSize);
|
||||
prepareBufferAllocator(false);
|
||||
|
||||
// encode
|
||||
ECChunk[] dataChunks = prepareDataChunksForEncoding();
|
||||
ECChunk[] parityChunks = prepareParityChunksForEncoding();
|
||||
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
|
||||
try {
|
||||
encoder.encode(dataChunks, parityChunks);
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||
}
|
||||
|
||||
// decode
|
||||
backupAndEraseChunks(clonedDataChunks, parityChunks);
|
||||
ECChunk[] inputChunks =
|
||||
prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
|
||||
markChunks(inputChunks);
|
||||
ensureOnlyLeastRequiredChunks(inputChunks);
|
||||
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
|
||||
int[] erasedIndexes = getErasedIndexesForDecoding();
|
||||
try {
|
||||
decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||
}
|
||||
|
||||
// validate
|
||||
restoreChunksFromMark(inputChunks);
|
||||
ECChunk[] clonedInputChunks = cloneChunksWithData(inputChunks);
|
||||
ECChunk[] clonedRecoveredChunks = cloneChunksWithData(recoveredChunks);
|
||||
int[] clonedErasedIndexes = erasedIndexes.clone();
|
||||
|
||||
try {
|
||||
validator.validate(clonedInputChunks, clonedErasedIndexes,
|
||||
clonedRecoveredChunks);
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||
}
|
||||
|
||||
// Check if input buffers' positions are moved to the end
|
||||
verifyBufferPositionAtEnd(clonedInputChunks);
|
||||
|
||||
// Check if validator does not change recovered chunks and erased indexes
|
||||
verifyChunksEqual(recoveredChunks, clonedRecoveredChunks);
|
||||
Assert.assertArrayEquals("Erased indexes should not be changed",
|
||||
erasedIndexes, clonedErasedIndexes);
|
||||
|
||||
// Check if validator uses correct indexes for validation
|
||||
List<Integer> validIndexesList =
|
||||
IntStream.of(CoderUtil.getValidIndexes(inputChunks)).boxed()
|
||||
.collect(Collectors.toList());
|
||||
List<Integer> newValidIndexesList =
|
||||
IntStream.of(validator.getNewValidIndexes()).boxed()
|
||||
.collect(Collectors.toList());
|
||||
List<Integer> erasedIndexesList =
|
||||
IntStream.of(erasedIndexes).boxed().collect(Collectors.toList());
|
||||
int newErasedIndex = validator.getNewErasedIndex();
|
||||
Assert.assertTrue(
|
||||
"Valid indexes for validation should contain"
|
||||
+ " erased indexes for decoding",
|
||||
newValidIndexesList.containsAll(erasedIndexesList));
|
||||
Assert.assertTrue(
|
||||
"An erased index for validation should be contained"
|
||||
+ " in valid indexes for decoding",
|
||||
validIndexesList.contains(newErasedIndex));
|
||||
Assert.assertFalse(
|
||||
"An erased index for validation should not be contained"
|
||||
+ " in valid indexes for validation",
|
||||
newValidIndexesList.contains(newErasedIndex));
|
||||
}
|
||||
|
||||
private void verifyChunksEqual(ECChunk[] chunks1, ECChunk[] chunks2) {
|
||||
boolean result = Arrays.deepEquals(toArrays(chunks1), toArrays(chunks2));
|
||||
assertTrue("Recovered chunks should not be changed", result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if validator throws {@link InvalidDecodingException} when
|
||||
* a decoded output buffer is polluted.
|
||||
*/
|
||||
@Test
|
||||
public void testValidateWithBadDecoding() throws IOException {
|
||||
prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
|
||||
erasedParityIndexes);
|
||||
this.usingDirectBuffer = true;
|
||||
prepareCoders(true);
|
||||
prepareValidator(true);
|
||||
prepareBufferAllocator(false);
|
||||
|
||||
// encode
|
||||
ECChunk[] dataChunks = prepareDataChunksForEncoding();
|
||||
ECChunk[] parityChunks = prepareParityChunksForEncoding();
|
||||
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
|
||||
try {
|
||||
encoder.encode(dataChunks, parityChunks);
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||
}
|
||||
|
||||
// decode
|
||||
backupAndEraseChunks(clonedDataChunks, parityChunks);
|
||||
ECChunk[] inputChunks =
|
||||
prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
|
||||
markChunks(inputChunks);
|
||||
ensureOnlyLeastRequiredChunks(inputChunks);
|
||||
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
|
||||
int[] erasedIndexes = getErasedIndexesForDecoding();
|
||||
try {
|
||||
decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||
}
|
||||
|
||||
// validate
|
||||
restoreChunksFromMark(inputChunks);
|
||||
polluteSomeChunk(recoveredChunks);
|
||||
try {
|
||||
validator.validate(inputChunks, erasedIndexes, recoveredChunks);
|
||||
Assert.fail("Validation should fail due to bad decoding");
|
||||
} catch (InvalidDecodingException e) {
|
||||
String expected = "Failed to validate decoding";
|
||||
GenericTestUtils.assertExceptionContains(expected, e);
|
||||
}
|
||||
}
|
||||
}
|
@ -334,7 +334,7 @@ protected void testInputPosition(boolean usingDirectBuffer) {
|
||||
verifyBufferPositionAtEnd(inputChunks);
|
||||
}
|
||||
|
||||
private void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
|
||||
void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
|
||||
for (ECChunk chunk : inputChunks) {
|
||||
if (chunk != null) {
|
||||
Assert.assertEquals(0, chunk.getBuffer().remaining());
|
||||
|
@ -819,6 +819,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
"dfs.datanode.ec.reconstruction.xmits.weight";
|
||||
public static final float DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT =
|
||||
0.5f;
|
||||
public static final String DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY =
|
||||
"dfs.datanode.ec.reconstruction.validation";
|
||||
public static final boolean DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE = false;
|
||||
|
||||
public static final String
|
||||
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
|
||||
|
@ -23,6 +23,7 @@
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Used for injecting faults in DFSClient and DFSOutputStream tests.
|
||||
@ -132,4 +133,10 @@ public void blockUtilSendFullBlockReport() {}
|
||||
* Just delay a while.
|
||||
*/
|
||||
public void delay() {}
|
||||
|
||||
/**
|
||||
* Used as a hook to inject data pollution
|
||||
* into an erasure coding reconstruction.
|
||||
*/
|
||||
public void badDecoding(ByteBuffer[] outputs) {}
|
||||
}
|
||||
|
@ -57,6 +57,7 @@ protected StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
|
||||
|
||||
private void init() throws IOException {
|
||||
initDecoderIfNecessary();
|
||||
initDecodingValidatorIfNecessary();
|
||||
getStripedReader().init();
|
||||
// allocate buffer to keep the reconstructed block data
|
||||
targetBuffer = allocateBuffer(getBufferSize());
|
||||
@ -192,7 +193,16 @@ private void reconstructTargets(int toReconstructLen) throws IOException {
|
||||
for (int i = 0; i < targetIndices.length; i++) {
|
||||
tarIndices[i] = targetIndices[i];
|
||||
}
|
||||
getDecoder().decode(inputs, tarIndices, outputs);
|
||||
|
||||
if (isValidationEnabled()) {
|
||||
markBuffers(inputs);
|
||||
getDecoder().decode(inputs, tarIndices, outputs);
|
||||
resetBuffers(inputs);
|
||||
|
||||
getValidator().validate(inputs, tarIndices, outputs);
|
||||
} else {
|
||||
getDecoder().decode(inputs, tarIndices, outputs);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -23,6 +23,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.InvalidDecodingException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
/**
|
||||
@ -53,6 +54,8 @@ public void run() {
|
||||
try {
|
||||
initDecoderIfNecessary();
|
||||
|
||||
initDecodingValidatorIfNecessary();
|
||||
|
||||
getStripedReader().init();
|
||||
|
||||
stripedWriter.init();
|
||||
@ -126,12 +129,31 @@ private void reconstructTargets(int toReconstructLen) throws IOException {
|
||||
int[] erasedIndices = stripedWriter.getRealTargetIndices();
|
||||
ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
|
||||
|
||||
if (isValidationEnabled()) {
|
||||
markBuffers(inputs);
|
||||
decode(inputs, erasedIndices, outputs);
|
||||
resetBuffers(inputs);
|
||||
|
||||
DataNodeFaultInjector.get().badDecoding(outputs);
|
||||
try {
|
||||
getValidator().validate(inputs, erasedIndices, outputs);
|
||||
} catch (InvalidDecodingException e) {
|
||||
getDatanode().getMetrics().incrECInvalidReconstructionTasks();
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
decode(inputs, erasedIndices, outputs);
|
||||
}
|
||||
|
||||
stripedWriter.updateRealTargetBuffers(toReconstructLen);
|
||||
}
|
||||
|
||||
private void decode(ByteBuffer[] inputs, int[] erasedIndices,
|
||||
ByteBuffer[] outputs) throws IOException {
|
||||
long start = System.nanoTime();
|
||||
getDecoder().decode(inputs, erasedIndices, outputs);
|
||||
long end = System.nanoTime();
|
||||
this.getDatanode().getMetrics().incrECDecodingTime(end - start);
|
||||
|
||||
stripedWriter.updateRealTargetBuffers(toReconstructLen);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.DecodingValidator;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -103,10 +105,14 @@ abstract class StripedReconstructor {
|
||||
private final Configuration conf;
|
||||
private final DataNode datanode;
|
||||
private final ErasureCodingPolicy ecPolicy;
|
||||
private final ErasureCoderOptions coderOptions;
|
||||
private RawErasureDecoder decoder;
|
||||
private final ExtendedBlock blockGroup;
|
||||
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
|
||||
|
||||
private final boolean isValidationEnabled;
|
||||
private DecodingValidator validator;
|
||||
|
||||
// position in striped internal block
|
||||
private long positionInBlock;
|
||||
private StripedReader stripedReader;
|
||||
@ -136,6 +142,13 @@ abstract class StripedReconstructor {
|
||||
cachingStrategy = CachingStrategy.newDefaultStrategy();
|
||||
|
||||
positionInBlock = 0L;
|
||||
|
||||
coderOptions = new ErasureCoderOptions(
|
||||
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
|
||||
isValidationEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE)
|
||||
&& !coderOptions.allowChangeInputs();
|
||||
}
|
||||
|
||||
public void incrBytesRead(boolean local, long delta) {
|
||||
@ -196,13 +209,18 @@ long getBlockLen(int i) {
|
||||
// Initialize decoder
|
||||
protected void initDecoderIfNecessary() {
|
||||
if (decoder == null) {
|
||||
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
|
||||
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
|
||||
decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(),
|
||||
coderOptions);
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize decoding validator
|
||||
protected void initDecodingValidatorIfNecessary() {
|
||||
if (isValidationEnabled && validator == null) {
|
||||
validator = new DecodingValidator(decoder);
|
||||
}
|
||||
}
|
||||
|
||||
long getPositionInBlock() {
|
||||
return positionInBlock;
|
||||
}
|
||||
@ -285,4 +303,28 @@ public ErasureCodingWorker getErasureCodingWorker() {
|
||||
static ByteBufferPool getBufferPool() {
|
||||
return BUFFER_POOL;
|
||||
}
|
||||
|
||||
boolean isValidationEnabled() {
|
||||
return isValidationEnabled;
|
||||
}
|
||||
|
||||
DecodingValidator getValidator() {
|
||||
return validator;
|
||||
}
|
||||
|
||||
protected static void markBuffers(ByteBuffer[] buffers) {
|
||||
for (ByteBuffer buffer: buffers) {
|
||||
if (buffer != null) {
|
||||
buffer.mark();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected static void resetBuffers(ByteBuffer[] buffers) {
|
||||
for (ByteBuffer buffer: buffers) {
|
||||
if (buffer != null) {
|
||||
buffer.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -152,6 +152,8 @@ public class DataNodeMetrics {
|
||||
MutableCounterLong ecReconstructionTasks;
|
||||
@Metric("Count of erasure coding failed reconstruction tasks")
|
||||
MutableCounterLong ecFailedReconstructionTasks;
|
||||
@Metric("Count of erasure coding invalidated reconstruction tasks")
|
||||
private MutableCounterLong ecInvalidReconstructionTasks;
|
||||
@Metric("Nanoseconds spent by decoding tasks")
|
||||
MutableCounterLong ecDecodingTimeNanos;
|
||||
@Metric("Bytes read by erasure coding worker")
|
||||
@ -519,6 +521,14 @@ public void incrECFailedReconstructionTasks() {
|
||||
ecFailedReconstructionTasks.incr();
|
||||
}
|
||||
|
||||
public void incrECInvalidReconstructionTasks() {
|
||||
ecInvalidReconstructionTasks.incr();
|
||||
}
|
||||
|
||||
public long getECInvalidReconstructionTasks() {
|
||||
return ecInvalidReconstructionTasks.value();
|
||||
}
|
||||
|
||||
public void incrDataNodeActiveXceiversCount() {
|
||||
dataNodeActiveXceiversCount.incr();
|
||||
}
|
||||
|
@ -3620,6 +3620,16 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.ec.reconstruction.validation</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Decide if datanode validates that EC reconstruction tasks reconstruct
|
||||
target blocks correctly. When validation fails, reconstruction tasks
|
||||
will fail and be retried by namenode.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.quota.init-threads</name>
|
||||
<value>4</value>
|
||||
|
@ -107,6 +107,23 @@ public ErasureCodingPolicy getEcPolicy() {
|
||||
return StripedFileTestUtil.getDefaultECPolicy();
|
||||
}
|
||||
|
||||
public boolean isValidationEnabled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public int getPendingTimeout() {
|
||||
return DFSConfigKeys
|
||||
.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT;
|
||||
}
|
||||
|
||||
public int getBlockSize() {
|
||||
return blockSize;
|
||||
}
|
||||
|
||||
public MiniDFSCluster getCluster() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
ecPolicy = getEcPolicy();
|
||||
@ -130,6 +147,11 @@ public void setup() throws IOException {
|
||||
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
|
||||
NativeRSRawErasureCoderFactory.CODER_NAME);
|
||||
}
|
||||
conf.setInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
||||
getPendingTimeout());
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
|
||||
isValidationEnabled());
|
||||
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||
cluster = new MiniDFSCluster.Builder(conf, basedir).numDataNodes(dnNum)
|
||||
.build();
|
||||
@ -305,7 +327,7 @@ private static void writeFile(DistributedFileSystem fs, String fileName,
|
||||
* and verify the block replica length, generationStamp and content.
|
||||
* 2. Read the file and verify content.
|
||||
*/
|
||||
private void assertFileBlocksReconstruction(String fileName, int fileLen,
|
||||
void assertFileBlocksReconstruction(String fileName, int fileLen,
|
||||
ReconstructionType type, int toRecoverBlockNum) throws Exception {
|
||||
if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
|
||||
Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
|
||||
|
@ -0,0 +1,115 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* This test extends {@link TestReconstructStripedFile} to test
|
||||
* ec reconstruction validation.
|
||||
*/
|
||||
public class TestReconstructStripedFileWithValidator
|
||||
extends TestReconstructStripedFile {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestReconstructStripedFileWithValidator.class);
|
||||
|
||||
public TestReconstructStripedFileWithValidator() {
|
||||
LOG.info("run {} with validator.",
|
||||
TestReconstructStripedFileWithValidator.class.getSuperclass()
|
||||
.getSimpleName());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test injects data pollution into decoded outputs once.
|
||||
* When validation enabled, the first reconstruction task should fail
|
||||
* in the validation, but the data will be recovered correctly
|
||||
* by the next task.
|
||||
* On the other hand, when validation disabled, the first reconstruction task
|
||||
* will succeed and then lead to data corruption.
|
||||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testValidatorWithBadDecoding()
|
||||
throws Exception {
|
||||
MiniDFSCluster cluster = getCluster();
|
||||
|
||||
cluster.getDataNodes().stream()
|
||||
.map(DataNode::getMetrics)
|
||||
.map(DataNodeMetrics::getECInvalidReconstructionTasks)
|
||||
.forEach(n -> Assert.assertEquals(0, (long) n));
|
||||
|
||||
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
||||
DataNodeFaultInjector badDecodingInjector = new DataNodeFaultInjector() {
|
||||
private final AtomicBoolean flag = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void badDecoding(ByteBuffer[] outputs) {
|
||||
if (!flag.get()) {
|
||||
for (ByteBuffer output : outputs) {
|
||||
output.mark();
|
||||
output.put((byte) (output.get(output.position()) + 1));
|
||||
output.reset();
|
||||
}
|
||||
}
|
||||
flag.set(true);
|
||||
}
|
||||
};
|
||||
DataNodeFaultInjector.set(badDecodingInjector);
|
||||
|
||||
int fileLen =
|
||||
(getEcPolicy().getNumDataUnits() + getEcPolicy().getNumParityUnits())
|
||||
* getBlockSize() + getBlockSize() / 10;
|
||||
try {
|
||||
assertFileBlocksReconstruction(
|
||||
"/testValidatorWithBadDecoding",
|
||||
fileLen,
|
||||
ReconstructionType.DataOnly,
|
||||
getEcPolicy().getNumParityUnits());
|
||||
|
||||
long sum = cluster.getDataNodes().stream()
|
||||
.map(DataNode::getMetrics)
|
||||
.mapToLong(DataNodeMetrics::getECInvalidReconstructionTasks)
|
||||
.sum();
|
||||
Assert.assertEquals(1, sum);
|
||||
} finally {
|
||||
DataNodeFaultInjector.set(oldInjector);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isValidationEnabled() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a small value for the failed reconstruction task to be
|
||||
* rescheduled in a short period of time.
|
||||
*/
|
||||
@Override
|
||||
public int getPendingTimeout() {
|
||||
return 10;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user