HDFS-14373. EC : Decoding is failing when block group last incomplete cell fall in to AlignedStripe. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Surendra Singh Lilhore 2019-10-08 00:14:30 +05:30
parent 9685a6c0e5
commit 382967be51
3 changed files with 68 additions and 3 deletions

View File

@ -248,6 +248,8 @@ private int readToBuffer(BlockReader blockReader,
DFSClient.LOG.warn("Found Checksum error for " DFSClient.LOG.warn("Found Checksum error for "
+ currentBlock + " from " + currentNode + currentBlock + " from " + currentNode
+ " at " + ce.getPos()); + " at " + ce.getPos());
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried // we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode); corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
throw ce; throw ce;
@ -255,6 +257,8 @@ private int readToBuffer(BlockReader blockReader,
DFSClient.LOG.warn("Exception while reading from " DFSClient.LOG.warn("Exception while reading from "
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+ currentNode, e); + currentNode, e);
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
throw e; throw e;
} }
} }

View File

@ -356,7 +356,8 @@ public static AlignedStripe[] divideOneStripe(ErasureCodingPolicy ecPolicy,
cells); cells);
// Step 3: merge into stripes // Step 3: merge into stripes
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges); AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
blockGroup, cellSize);
// Step 4: calculate each chunk's position in destination buffer. Since the // Step 4: calculate each chunk's position in destination buffer. Since the
// whole read range is within a single stripe, the logic is simpler here. // whole read range is within a single stripe, the logic is simpler here.
@ -417,7 +418,8 @@ public static AlignedStripe[] divideByteRangeIntoStripes(
cells); cells);
// Step 3: merge into at most 5 stripes // Step 3: merge into at most 5 stripes
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges); AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
blockGroup, cellSize);
// Step 4: calculate each chunk's position in destination buffer // Step 4: calculate each chunk's position in destination buffer
calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf); calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
@ -513,7 +515,8 @@ private static VerticalRange[] getRangesForInternalBlocks(
* {@link AlignedStripe} instances. * {@link AlignedStripe} instances.
*/ */
private static AlignedStripe[] mergeRangesForInternalBlocks( private static AlignedStripe[] mergeRangesForInternalBlocks(
ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) { ErasureCodingPolicy ecPolicy, VerticalRange[] ranges,
LocatedStripedBlock blockGroup, int cellSize) {
int dataBlkNum = ecPolicy.getNumDataUnits(); int dataBlkNum = ecPolicy.getNumDataUnits();
int parityBlkNum = ecPolicy.getNumParityUnits(); int parityBlkNum = ecPolicy.getNumParityUnits();
List<AlignedStripe> stripes = new ArrayList<>(); List<AlignedStripe> stripes = new ArrayList<>();
@ -525,6 +528,17 @@ private static AlignedStripe[] mergeRangesForInternalBlocks(
} }
} }
// Add block group last cell offset in stripePoints if it is fall in to read
// offset range.
int lastCellIdxInBG = (int) (blockGroup.getBlockSize() / cellSize);
int idxInInternalBlk = lastCellIdxInBG / ecPolicy.getNumDataUnits();
long lastCellEndOffset = (idxInInternalBlk * (long)cellSize)
+ (blockGroup.getBlockSize() % cellSize);
if (stripePoints.first() < lastCellEndOffset
&& stripePoints.last() > lastCellEndOffset) {
stripePoints.add(lastCellEndOffset);
}
long prev = -1; long prev = -1;
for (long point : stripePoints) { for (long point : stripePoints) {
if (prev >= 0) { if (prev >= 0) {

View File

@ -19,8 +19,11 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -561,6 +564,50 @@ public void testCloseDoesNotAllocateNewBuffer() throws Exception {
} }
} }
@Test
public void testReadWhenLastIncompleteCellComeInToDecodeAlignedStripe()
throws IOException {
DataNodeProperties stopDataNode = null;
try {
cluster.waitActive();
ErasureCodingPolicy policy = getEcPolicy();
DistributedFileSystem filesystem = cluster.getFileSystem();
filesystem.enableErasureCodingPolicy(policy.getName());
Path dir = new Path("/tmp");
filesystem.mkdirs(dir);
filesystem.getClient().setErasureCodingPolicy(dir.toString(),
policy.getName());
Path f = new Path(dir, "file");
//1. File with one stripe, last data cell should be half filed.
long fileLength = (policy.getCellSize() * policy.getNumDataUnits())
- (policy.getCellSize() / 2);
DFSTestUtil.createFile(filesystem, f, fileLength, (short) 1, 0);
//2. Stop first DN from stripe.
LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
f.toString(), 0, fileLength);
LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(bg,
cellSize, dataBlocks, parityBlocks);
cluster.stopDataNode(blocks[0].getLocations()[0].getName());
//3. Do pread for fist cell, reconstruction should happen
try (FSDataInputStream in = filesystem.open(f)) {
DFSStripedInputStream stripedIn = (DFSStripedInputStream) in
.getWrappedStream();
byte[] b = new byte[policy.getCellSize()];
stripedIn.read(0, b, 0, policy.getCellSize());
}
} catch (HadoopIllegalArgumentException e) {
fail(e.getMessage());
} finally {
if (stopDataNode != null) {
cluster.restartDataNode(stopDataNode, true);
}
}
}
/** /**
* Empties the pool for the specified buffer type, for the current ecPolicy. * Empties the pool for the specified buffer type, for the current ecPolicy.
* <p> * <p>