HDFS-10460. Recompute block checksum for a particular range less than file size on the fly by reconstructing missed block. Contributed by Rakesh R
This commit is contained in:
parent
ff07b10803
commit
e6cb07520f
@ -454,10 +454,11 @@ void checksumBlocks() throws IOException {
|
||||
private boolean checksumBlockGroup(
|
||||
LocatedStripedBlock blockGroup) throws IOException {
|
||||
ExtendedBlock block = blockGroup.getBlock();
|
||||
long requestedNumBytes = block.getNumBytes();
|
||||
if (getRemaining() < block.getNumBytes()) {
|
||||
block.setNumBytes(getRemaining());
|
||||
requestedNumBytes = getRemaining();
|
||||
}
|
||||
setRemaining(getRemaining() - block.getNumBytes());
|
||||
setRemaining(getRemaining() - requestedNumBytes);
|
||||
|
||||
StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
|
||||
blockGroup.getLocations(), blockGroup.getBlockTokens(),
|
||||
@ -468,7 +469,8 @@ private boolean checksumBlockGroup(
|
||||
boolean done = false;
|
||||
for (int j = 0; !done && j < datanodes.length; j++) {
|
||||
try {
|
||||
tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]);
|
||||
tryDatanode(blockGroup, stripedBlockInfo, datanodes[j],
|
||||
requestedNumBytes);
|
||||
done = true;
|
||||
} catch (InvalidBlockTokenException ibte) {
|
||||
if (bgIdx > getLastRetriedIndex()) {
|
||||
@ -496,7 +498,8 @@ private boolean checksumBlockGroup(
|
||||
*/
|
||||
private void tryDatanode(LocatedStripedBlock blockGroup,
|
||||
StripedBlockInfo stripedBlockInfo,
|
||||
DatanodeInfo datanode) throws IOException {
|
||||
DatanodeInfo datanode,
|
||||
long requestedNumBytes) throws IOException {
|
||||
|
||||
try (IOStreamPair pair = getClient().connectToDN(datanode,
|
||||
getTimeout(), blockGroup.getBlockToken())) {
|
||||
@ -506,7 +509,7 @@ private void tryDatanode(LocatedStripedBlock blockGroup,
|
||||
|
||||
// get block MD5
|
||||
createSender(pair).blockGroupChecksum(stripedBlockInfo,
|
||||
blockGroup.getBlockToken());
|
||||
blockGroup.getBlockToken(), requestedNumBytes);
|
||||
|
||||
BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
|
||||
PBHelperClient.vintPrefixed(pair.in));
|
||||
|
@ -207,8 +207,11 @@ void blockChecksum(ExtendedBlock blk,
|
||||
*
|
||||
* @param stripedBlockInfo a striped block info.
|
||||
* @param blockToken security token for accessing the block.
|
||||
* @param requestedNumBytes requested number of bytes in the block group
|
||||
* to compute the checksum.
|
||||
* @throws IOException
|
||||
*/
|
||||
void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
|
||||
Token<BlockTokenIdentifier> blockToken) throws IOException;
|
||||
Token<BlockTokenIdentifier> blockToken,
|
||||
long requestedNumBytes) throws IOException;
|
||||
}
|
||||
|
@ -266,7 +266,8 @@ public void blockChecksum(final ExtendedBlock blk,
|
||||
|
||||
@Override
|
||||
public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
|
||||
Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||
Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
|
||||
throws IOException {
|
||||
OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
|
||||
.setHeader(DataTransferProtoUtil.buildBaseHeader(
|
||||
stripedBlockInfo.getBlock(), blockToken))
|
||||
@ -278,6 +279,7 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
|
||||
.convertBlockIndices(stripedBlockInfo.getBlockIndices()))
|
||||
.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
|
||||
stripedBlockInfo.getErasureCodingPolicy()))
|
||||
.setRequestedNumBytes(requestedNumBytes)
|
||||
.build();
|
||||
|
||||
send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
|
||||
|
@ -155,6 +155,7 @@ message OpBlockGroupChecksumProto {
|
||||
repeated hadoop.common.TokenProto blockTokens = 3;
|
||||
required ErasureCodingPolicyProto ecPolicy = 4;
|
||||
repeated uint32 blockIndices = 5;
|
||||
required uint64 requestedNumBytes = 6;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -312,7 +312,8 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
|
||||
|
||||
try {
|
||||
blockGroupChecksum(stripedBlockInfo,
|
||||
PBHelperClient.convert(proto.getHeader().getToken()));
|
||||
PBHelperClient.convert(proto.getHeader().getToken()),
|
||||
proto.getRequestedNumBytes());
|
||||
} finally {
|
||||
if (traceScope != null) {
|
||||
traceScope.close();
|
||||
|
@ -285,10 +285,8 @@ void compute() throws IOException {
|
||||
}
|
||||
setOutBytes(md5out.getDigest());
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("block=" + getBlock() + ", bytesPerCRC=" + getBytesPerCRC()
|
||||
+ ", crcPerBlock=" + getCrcPerBlock() + ", md5out=" + md5out);
|
||||
}
|
||||
LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}",
|
||||
getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out);
|
||||
} finally {
|
||||
IOUtils.closeStream(getChecksumIn());
|
||||
IOUtils.closeStream(getMetadataIn());
|
||||
@ -335,11 +333,13 @@ static class BlockGroupNonStripedChecksumComputer
|
||||
private final DatanodeInfo[] datanodes;
|
||||
private final Token<BlockTokenIdentifier>[] blockTokens;
|
||||
private final byte[] blockIndices;
|
||||
private final long requestedNumBytes;
|
||||
|
||||
private final DataOutputBuffer md5writer = new DataOutputBuffer();
|
||||
|
||||
BlockGroupNonStripedChecksumComputer(DataNode datanode,
|
||||
StripedBlockInfo stripedBlockInfo)
|
||||
StripedBlockInfo stripedBlockInfo,
|
||||
long requestedNumBytes)
|
||||
throws IOException {
|
||||
super(datanode);
|
||||
this.blockGroup = stripedBlockInfo.getBlock();
|
||||
@ -347,6 +347,7 @@ static class BlockGroupNonStripedChecksumComputer
|
||||
this.datanodes = stripedBlockInfo.getDatanodes();
|
||||
this.blockTokens = stripedBlockInfo.getBlockTokens();
|
||||
this.blockIndices = stripedBlockInfo.getBlockIndices();
|
||||
this.requestedNumBytes = requestedNumBytes;
|
||||
}
|
||||
|
||||
private static class LiveBlockInfo {
|
||||
@ -380,24 +381,29 @@ void compute() throws IOException {
|
||||
liveDns.put(blockIndices[idx],
|
||||
new LiveBlockInfo(datanodes[idx], blockTokens[idx]));
|
||||
}
|
||||
long checksumLen = 0;
|
||||
for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) {
|
||||
try {
|
||||
ExtendedBlock block = getInternalBlock(numDataUnits, idx);
|
||||
|
||||
LiveBlockInfo liveBlkInfo = liveDns.get((byte) idx);
|
||||
if (liveBlkInfo == null) {
|
||||
// reconstruct block and calculate checksum for missing node
|
||||
recalculateChecksum(idx);
|
||||
recalculateChecksum(idx, block.getNumBytes());
|
||||
} else {
|
||||
try {
|
||||
ExtendedBlock block = StripedBlockUtil.constructInternalBlock(
|
||||
blockGroup, ecPolicy.getCellSize(), numDataUnits, idx);
|
||||
checksumBlock(block, idx, liveBlkInfo.getToken(),
|
||||
liveBlkInfo.getDn());
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Exception while reading checksum", ioe);
|
||||
// reconstruct block and calculate checksum for the failed node
|
||||
recalculateChecksum(idx);
|
||||
recalculateChecksum(idx, block.getNumBytes());
|
||||
}
|
||||
}
|
||||
checksumLen += block.getNumBytes();
|
||||
if (checksumLen >= requestedNumBytes) {
|
||||
break; // done with the computation, simply return.
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get the checksum", e);
|
||||
}
|
||||
@ -407,6 +413,20 @@ void compute() throws IOException {
|
||||
setOutBytes(md5out.getDigest());
|
||||
}
|
||||
|
||||
private ExtendedBlock getInternalBlock(int numDataUnits, int idx) {
|
||||
// Sets requested number of bytes in blockGroup which is required to
|
||||
// construct the internal block for computing checksum.
|
||||
long actualNumBytes = blockGroup.getNumBytes();
|
||||
blockGroup.setNumBytes(requestedNumBytes);
|
||||
|
||||
ExtendedBlock block = StripedBlockUtil.constructInternalBlock(blockGroup,
|
||||
ecPolicy.getCellSize(), numDataUnits, idx);
|
||||
|
||||
// Set back actualNumBytes value in blockGroup.
|
||||
blockGroup.setNumBytes(actualNumBytes);
|
||||
return block;
|
||||
}
|
||||
|
||||
private void checksumBlock(ExtendedBlock block, int blockIdx,
|
||||
Token<BlockTokenIdentifier> blockToken,
|
||||
DatanodeInfo targetDatanode) throws IOException {
|
||||
@ -446,9 +466,7 @@ private void checksumBlock(ExtendedBlock block, int blockIdx,
|
||||
//read md5
|
||||
final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
|
||||
md5.write(md5writer);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
|
||||
}
|
||||
LOG.debug("got reply from datanode:{}, md5={}", targetDatanode, md5);
|
||||
}
|
||||
}
|
||||
|
||||
@ -456,34 +474,35 @@ private void checksumBlock(ExtendedBlock block, int blockIdx,
|
||||
* Reconstruct this data block and recalculate checksum.
|
||||
*
|
||||
* @param errBlkIndex
|
||||
* error index to be reconstrcuted and recalculate checksum.
|
||||
* error index to be reconstructed and recalculate checksum.
|
||||
* @param blockLength
|
||||
* number of bytes in the block to compute checksum.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void recalculateChecksum(int errBlkIndex) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Recalculate checksum for the missing/failed block index "
|
||||
+ errBlkIndex);
|
||||
}
|
||||
private void recalculateChecksum(int errBlkIndex, long blockLength)
|
||||
throws IOException {
|
||||
LOG.debug("Recalculate checksum for the missing/failed block index {}",
|
||||
errBlkIndex);
|
||||
byte[] errIndices = new byte[1];
|
||||
errIndices[0] = (byte) errBlkIndex;
|
||||
|
||||
StripedReconstructionInfo stripedReconInfo =
|
||||
new StripedReconstructionInfo(
|
||||
blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
|
||||
blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
|
||||
final StripedBlockChecksumReconstructor checksumRecon =
|
||||
new StripedBlockChecksumReconstructor(
|
||||
getDatanode().getErasureCodingWorker(), stripedReconInfo,
|
||||
md5writer);
|
||||
getDatanode().getErasureCodingWorker(), stripedReconInfo,
|
||||
md5writer, blockLength);
|
||||
checksumRecon.reconstruct();
|
||||
|
||||
DataChecksum checksum = checksumRecon.getChecksum();
|
||||
long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0
|
||||
: checksumRecon.getChecksumDataLen() / checksum.getChecksumSize();
|
||||
setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(),
|
||||
crcPerBlock, checksum.getChecksumType());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Recalculated checksum for the block index " + errBlkIndex
|
||||
+ ": md5=" + checksumRecon.getMD5());
|
||||
}
|
||||
setOrVerifyChecksumProperties(errBlkIndex,
|
||||
checksum.getBytesPerChecksum(), crcPerBlock,
|
||||
checksum.getChecksumType());
|
||||
LOG.debug("Recalculated checksum for the block index:{}, md5={}",
|
||||
errBlkIndex, checksumRecon.getMD5());
|
||||
}
|
||||
|
||||
private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
|
||||
@ -509,11 +528,9 @@ private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
|
||||
setCrcType(DataChecksum.Type.MIXED);
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (blockIdx == 0) {
|
||||
LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
|
||||
+ ", crcPerBlock=" + getCrcPerBlock());
|
||||
}
|
||||
if (blockIdx == 0) {
|
||||
LOG.debug("set bytesPerCRC={}, crcPerBlock={}", getBytesPerCRC(),
|
||||
getCrcPerBlock());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -964,7 +964,7 @@ public void blockChecksum(ExtendedBlock block,
|
||||
|
||||
@Override
|
||||
public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
|
||||
final Token<BlockTokenIdentifier> blockToken)
|
||||
final Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
|
||||
throws IOException {
|
||||
updateCurrentThreadName("Getting checksum for block group" +
|
||||
stripedBlockInfo.getBlock());
|
||||
@ -973,7 +973,8 @@ public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
|
||||
Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
|
||||
|
||||
AbstractBlockChecksumComputer maker =
|
||||
new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo);
|
||||
new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo,
|
||||
requestedNumBytes);
|
||||
|
||||
try {
|
||||
maker.compute();
|
||||
|
@ -20,6 +20,7 @@
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
@ -41,14 +42,17 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor {
|
||||
private DataOutputBuffer checksumWriter;
|
||||
private MD5Hash md5;
|
||||
private long checksumDataLen;
|
||||
private long requestedLen;
|
||||
|
||||
public StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
|
||||
StripedReconstructionInfo stripedReconInfo,
|
||||
DataOutputBuffer checksumWriter) throws IOException {
|
||||
DataOutputBuffer checksumWriter,
|
||||
long requestedBlockLength) throws IOException {
|
||||
super(worker, stripedReconInfo);
|
||||
this.targetIndices = stripedReconInfo.getTargetIndices();
|
||||
assert targetIndices != null;
|
||||
this.checksumWriter = checksumWriter;
|
||||
this.requestedLen = requestedBlockLength;
|
||||
init();
|
||||
}
|
||||
|
||||
@ -69,8 +73,9 @@ private void init() throws IOException {
|
||||
|
||||
public void reconstruct() throws IOException {
|
||||
MessageDigest digester = MD5Hash.getDigester();
|
||||
while (getPositionInBlock() < getMaxTargetLength()) {
|
||||
long remaining = getMaxTargetLength() - getPositionInBlock();
|
||||
long maxTargetLength = getMaxTargetLength();
|
||||
while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) {
|
||||
long remaining = maxTargetLength - getPositionInBlock();
|
||||
final int toReconstructLen = (int) Math
|
||||
.min(getStripedReader().getBufferSize(), remaining);
|
||||
// step1: read from minimum source DNs required for reconstruction.
|
||||
@ -81,13 +86,11 @@ public void reconstruct() throws IOException {
|
||||
reconstructTargets(toReconstructLen);
|
||||
|
||||
// step3: calculate checksum
|
||||
getChecksum().calculateChunkedSums(targetBuffer.array(), 0,
|
||||
targetBuffer.remaining(), checksumBuf, 0);
|
||||
checksumDataLen += checksumWithTargetOutput(targetBuffer.array(),
|
||||
toReconstructLen, digester);
|
||||
|
||||
// step4: updates the digest using the checksum array of bytes
|
||||
digester.update(checksumBuf, 0, checksumBuf.length);
|
||||
checksumDataLen += checksumBuf.length;
|
||||
updatePositionInBlock(toReconstructLen);
|
||||
requestedLen -= toReconstructLen;
|
||||
clearBuffers();
|
||||
}
|
||||
|
||||
@ -96,6 +99,56 @@ public void reconstruct() throws IOException {
|
||||
md5.write(checksumWriter);
|
||||
}
|
||||
|
||||
private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen,
|
||||
MessageDigest digester) throws IOException {
|
||||
long checksumDataLength = 0;
|
||||
// Calculate partial block checksum. There are two cases.
|
||||
// case-1) length of data bytes which is fraction of bytesPerCRC
|
||||
// case-2) length of data bytes which is less than bytesPerCRC
|
||||
if (requestedLen <= toReconstructLen) {
|
||||
int remainingLen = (int) requestedLen;
|
||||
outputData = Arrays.copyOf(targetBuffer.array(), remainingLen);
|
||||
|
||||
int partialLength = remainingLen % getChecksum().getBytesPerChecksum();
|
||||
|
||||
int checksumRemaining = (remainingLen
|
||||
/ getChecksum().getBytesPerChecksum())
|
||||
* getChecksum().getChecksumSize();
|
||||
|
||||
int dataOffset = 0;
|
||||
|
||||
// case-1) length of data bytes which is fraction of bytesPerCRC
|
||||
if (checksumRemaining > 0) {
|
||||
remainingLen = remainingLen - partialLength;
|
||||
checksumBuf = new byte[checksumRemaining];
|
||||
getChecksum().calculateChunkedSums(outputData, dataOffset,
|
||||
remainingLen, checksumBuf, 0);
|
||||
digester.update(checksumBuf, 0, checksumBuf.length);
|
||||
checksumDataLength = checksumBuf.length;
|
||||
dataOffset = remainingLen;
|
||||
}
|
||||
|
||||
// case-2) length of data bytes which is less than bytesPerCRC
|
||||
if (partialLength > 0) {
|
||||
byte[] partialCrc = new byte[getChecksum().getChecksumSize()];
|
||||
getChecksum().update(outputData, dataOffset, partialLength);
|
||||
getChecksum().writeValue(partialCrc, 0, true);
|
||||
digester.update(partialCrc);
|
||||
checksumDataLength += partialCrc.length;
|
||||
}
|
||||
|
||||
clearBuffers();
|
||||
// calculated checksum for the requested length, return checksum length.
|
||||
return checksumDataLength;
|
||||
}
|
||||
getChecksum().calculateChunkedSums(outputData, 0,
|
||||
outputData.length, checksumBuf, 0);
|
||||
|
||||
// updates digest using the checksum array of bytes
|
||||
digester.update(checksumBuf, 0, checksumBuf.length);
|
||||
return checksumBuf.length;
|
||||
}
|
||||
|
||||
private void reconstructTargets(int toReconstructLen) {
|
||||
initDecoderIfNecessary();
|
||||
|
||||
|
@ -17,8 +17,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -31,6 +29,9 @@
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@ -41,7 +42,8 @@
|
||||
* are the same.
|
||||
*/
|
||||
public class TestFileChecksum {
|
||||
public static final Log LOG = LogFactory.getLog(TestFileChecksum.class);
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(TestFileChecksum.class);
|
||||
|
||||
private int dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
|
||||
private int parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS;
|
||||
@ -58,6 +60,7 @@ public class TestFileChecksum {
|
||||
private int stripSize = cellSize * dataBlocks;
|
||||
private int blockGroupSize = stripesPerBlock * stripSize;
|
||||
private int fileSize = numBlockGroups * blockGroupSize;
|
||||
private int bytesPerCRC;
|
||||
|
||||
private String ecDir = "/striped";
|
||||
private String stripedFile1 = ecDir + "/stripedFileChecksum1";
|
||||
@ -79,10 +82,9 @@ public void setup() throws IOException {
|
||||
fs = cluster.getFileSystem();
|
||||
client = fs.getClient();
|
||||
|
||||
prepareTestFiles();
|
||||
|
||||
getDataNodeToKill(stripedFile1);
|
||||
getDataNodeToKill(replicatedFile);
|
||||
bytesPerCRC = conf.getInt(
|
||||
HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
|
||||
HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
|
||||
}
|
||||
|
||||
@After
|
||||
@ -93,49 +95,57 @@ public void tearDown() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksum1() throws Exception {
|
||||
int length = 0;
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
|
||||
testStripedFileChecksum(length, length + 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksum2() throws Exception {
|
||||
int length = stripSize - 1;
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
|
||||
testStripedFileChecksum(length, length - 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksum3() throws Exception {
|
||||
int length = stripSize;
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
|
||||
testStripedFileChecksum(length, length - 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksum4() throws Exception {
|
||||
int length = stripSize + cellSize * 2;
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
|
||||
testStripedFileChecksum(length, length - 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksum5() throws Exception {
|
||||
int length = blockGroupSize;
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
|
||||
testStripedFileChecksum(length, length - 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksum6() throws Exception {
|
||||
int length = blockGroupSize + blockSize;
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
|
||||
testStripedFileChecksum(length, length - 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksum7() throws Exception {
|
||||
int length = -1; // whole file
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
|
||||
testStripedFileChecksum(length, fileSize);
|
||||
}
|
||||
|
||||
void testStripedFileChecksum(int range1, int range2) throws Exception {
|
||||
private void testStripedFileChecksum(int range1, int range2)
|
||||
throws Exception {
|
||||
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
|
||||
range1, false);
|
||||
FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2,
|
||||
@ -153,8 +163,9 @@ void testStripedFileChecksum(int range1, int range2) throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedAndReplicatedFileChecksum() throws Exception {
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1, replicatedFile});
|
||||
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
|
||||
10, false);
|
||||
FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
|
||||
@ -163,8 +174,9 @@ public void testStripedAndReplicatedFileChecksum() throws Exception {
|
||||
Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception {
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1});
|
||||
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, fileSize,
|
||||
false);
|
||||
FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile1,
|
||||
@ -177,8 +189,9 @@ public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception {
|
||||
stripedFileChecksum1.equals(stripedFileChecksumRecon));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception {
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
|
||||
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, -1,
|
||||
false);
|
||||
FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, -1,
|
||||
@ -198,6 +211,255 @@ public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception {
|
||||
stripedFileChecksum2.equals(stripedFileChecksum2Recon));
|
||||
}
|
||||
|
||||
private void testStripedFileChecksumWithMissedDataBlocksRangeQuery(
|
||||
String stripedFile, int requestedLen) throws Exception {
|
||||
LOG.info("Checksum file:{}, requested length:{}", stripedFile,
|
||||
requestedLen);
|
||||
prepareTestFiles(fileSize, new String[] {stripedFile});
|
||||
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile,
|
||||
requestedLen, false);
|
||||
FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile,
|
||||
requestedLen, true);
|
||||
|
||||
LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
|
||||
LOG.info("stripedFileChecksumRecon:" + stripedFileChecksumRecon);
|
||||
|
||||
Assert.assertTrue("Checksum mismatches!",
|
||||
stripedFileChecksum1.equals(stripedFileChecksumRecon));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed for a small file less than
|
||||
* bytesPerCRC size.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery1()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed for a small file less than
|
||||
* bytesPerCRC size.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery2()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 10);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving bytesPerCRC
|
||||
* length of file range for checksum calculation. 512 is the value of
|
||||
* bytesPerCRC.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery3()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
bytesPerCRC);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving 'cellsize'
|
||||
* length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery4()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
cellSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving less than
|
||||
* cellsize length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery5()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
cellSize - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving greater than
|
||||
* cellsize length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery6()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
cellSize + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving two times
|
||||
* cellsize length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery7()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
cellSize * 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving stripSize
|
||||
* length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery8()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
stripSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving less than
|
||||
* stripSize length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery9()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
stripSize - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving greater than
|
||||
* stripSize length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery10()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
stripSize + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving less than
|
||||
* blockGroupSize length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery11()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
blockGroupSize - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving greaterthan
|
||||
* blockGroupSize length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery12()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
blockGroupSize + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving greater than
|
||||
* blockGroupSize length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery13()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
blockGroupSize * numBlockGroups / 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed by giving lessthan
|
||||
* fileSize length of file range for checksum calculation.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery14()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
fileSize - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed for a length greater than
|
||||
* file size.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery15()
|
||||
throws Exception {
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
|
||||
fileSize * 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed for a small file less than
|
||||
* bytesPerCRC size.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery16()
|
||||
throws Exception {
|
||||
int fileLength = 100;
|
||||
String stripedFile3 = ecDir + "/stripedFileChecksum3";
|
||||
prepareTestFiles(fileLength, new String[] {stripedFile3});
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
|
||||
fileLength - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed for a small file less than
|
||||
* bytesPerCRC size.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery17()
|
||||
throws Exception {
|
||||
int fileLength = 100;
|
||||
String stripedFile3 = ecDir + "/stripedFileChecksum3";
|
||||
prepareTestFiles(fileLength, new String[] {stripedFile3});
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed for a small file less than
|
||||
* bytesPerCRC size.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery18()
|
||||
throws Exception {
|
||||
int fileLength = 100;
|
||||
String stripedFile3 = ecDir + "/stripedFileChecksum3";
|
||||
prepareTestFiles(fileLength, new String[] {stripedFile3});
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 10);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed with greater than file
|
||||
* length.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery19()
|
||||
throws Exception {
|
||||
int fileLength = 100;
|
||||
String stripedFile3 = ecDir + "/stripedFileChecksum3";
|
||||
prepareTestFiles(fileLength, new String[] {stripedFile3});
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
|
||||
fileLength * 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that the checksum can be computed for small file with less
|
||||
* than file length.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery20()
|
||||
throws Exception {
|
||||
int fileLength = bytesPerCRC;
|
||||
String stripedFile3 = ecDir + "/stripedFileChecksum3";
|
||||
prepareTestFiles(fileLength, new String[] {stripedFile3});
|
||||
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
|
||||
bytesPerCRC - 1);
|
||||
}
|
||||
|
||||
private FileChecksum getFileChecksum(String filePath, int range,
|
||||
boolean killDn) throws Exception {
|
||||
int dnIdxToDie = -1;
|
||||
@ -223,12 +485,9 @@ private FileChecksum getFileChecksum(String filePath, int range,
|
||||
return fc;
|
||||
}
|
||||
|
||||
void prepareTestFiles() throws IOException {
|
||||
byte[] fileData = StripedFileTestUtil.generateBytes(fileSize);
|
||||
|
||||
String[] filePaths = new String[] {
|
||||
stripedFile1, stripedFile2, replicatedFile
|
||||
};
|
||||
private void prepareTestFiles(int fileLength, String[] filePaths)
|
||||
throws IOException {
|
||||
byte[] fileData = StripedFileTestUtil.generateBytes(fileLength);
|
||||
|
||||
for (String filePath : filePaths) {
|
||||
Path testPath = new Path(filePath);
|
||||
@ -267,4 +526,4 @@ int getDataNodeToKill(String filePath) throws IOException {
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user