HDFS-8453. Erasure coding: properly handle start offset for internal blocks in a block group. Contributed by Zhe Zhang.

This commit is contained in:
Jing Zhao 2015-06-02 16:14:08 -07:00
parent 71329e817b
commit 5f15084bd5
7 changed files with 72 additions and 59 deletions

View File

@ -84,7 +84,7 @@ public final class HdfsConstants {
public static final byte NUM_DATA_BLOCKS = 6; public static final byte NUM_DATA_BLOCKS = 6;
public static final byte NUM_PARITY_BLOCKS = 3; public static final byte NUM_PARITY_BLOCKS = 3;
// The chunk size for striped block which is used by erasure coding // The chunk size for striped block which is used by erasure coding
public static final int BLOCK_STRIPED_CELL_SIZE = 256 * 1024; public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024;
// SafeMode actions // SafeMode actions
public enum SafeModeAction { public enum SafeModeAction {

View File

@ -274,3 +274,6 @@
HDFS-8517. Fix a decoding issue in stripped block recovering in client side. HDFS-8517. Fix a decoding issue in stripped block recovering in client side.
(Kai Zheng via jing9) (Kai Zheng via jing9)
HDFS-8453. Erasure coding: properly handle start offset for internal blocks
in a block group. (Zhe Zhang via jing9)

View File

@ -1016,7 +1016,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
} }
deadNodes.clear(); //2nd option is to remove only nodes[blockId] deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo(); openInfo();
block = getBlockAt(block.getStartOffset()); block = refreshLocatedBlock(block);
failures++; failures++;
} }
} }
@ -1088,15 +1088,15 @@ private static String getBestNodeDNAddrPairErrorString(
return errMsgr.toString(); return errMsgr.toString();
} }
protected void fetchBlockByteRange(long blockStartOffset, long start, long end, protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
LocatedBlock block = getBlockAt(blockStartOffset); block = refreshLocatedBlock(block);
while (true) { while (true) {
DNAddrPair addressPair = chooseDataNode(block, null); DNAddrPair addressPair = chooseDataNode(block, null);
try { try {
actualGetFromOneDataNode(addressPair, blockStartOffset, start, end, actualGetFromOneDataNode(addressPair, block, start, end,
buf, offset, corruptedBlockMap); buf, offset, corruptedBlockMap);
return; return;
} catch (IOException e) { } catch (IOException e) {
@ -1107,7 +1107,7 @@ protected void fetchBlockByteRange(long blockStartOffset, long start, long end,
} }
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final long blockStartOffset, final long start, final long end, final LocatedBlock block, final long start, final long end,
final ByteBuffer bb, final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) { final int hedgedReadId) {
@ -1120,7 +1120,7 @@ public ByteBuffer call() throws Exception {
TraceScope scope = TraceScope scope =
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan); Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
try { try {
actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf, actualGetFromOneDataNode(datanode, block, start, end, buf,
offset, corruptedBlockMap); offset, corruptedBlockMap);
return bb; return bb;
} finally { } finally {
@ -1134,18 +1134,18 @@ public ByteBuffer call() throws Exception {
* Used when reading contiguous blocks * Used when reading contiguous blocks
*/ */
private void actualGetFromOneDataNode(final DNAddrPair datanode, private void actualGetFromOneDataNode(final DNAddrPair datanode,
long blockStartOffset, final long start, final long end, byte[] buf, LocatedBlock block, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
final int length = (int) (end - start + 1); final int length = (int) (end - start + 1);
actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf, actualGetFromOneDataNode(datanode, block, start, end, buf,
new int[]{offset}, new int[]{length}, corruptedBlockMap); new int[]{offset}, new int[]{length}, corruptedBlockMap);
} }
/** /**
* Read data from one DataNode. * Read data from one DataNode.
* @param datanode the datanode from which to read data * @param datanode the datanode from which to read data
* @param blockStartOffset starting offset in the file * @param block the located block containing the requested data
* @param startInBlk the startInBlk offset of the block * @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block * @param endInBlk the endInBlk offset of the block
* @param buf the given byte array into which the data is read * @param buf the given byte array into which the data is read
@ -1157,7 +1157,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
* block replica * block replica
*/ */
void actualGetFromOneDataNode(final DNAddrPair datanode, void actualGetFromOneDataNode(final DNAddrPair datanode,
long blockStartOffset, final long startInBlk, final long endInBlk, LocatedBlock block, final long startInBlk, final long endInBlk,
byte[] buf, int[] offsets, int[] lengths, byte[] buf, int[] offsets, int[] lengths,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
@ -1171,7 +1171,7 @@ void actualGetFromOneDataNode(final DNAddrPair datanode,
// cached block locations may have been updated by chooseDataNode() // cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the // or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop. // start of the loop.
LocatedBlock block = getBlockAt(blockStartOffset); block = refreshLocatedBlock(block);
BlockReader reader = null; BlockReader reader = null;
try { try {
DFSClientFaultInjector.get().fetchFromDatanodeException(); DFSClientFaultInjector.get().fetchFromDatanodeException();
@ -1227,6 +1227,17 @@ void actualGetFromOneDataNode(final DNAddrPair datanode,
} }
} }
/**
* Refresh cached block locations.
* @param block The currently cached block locations
* @return Refreshed block locations
* @throws IOException
*/
protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
throws IOException {
return getBlockAt(block.getStartOffset());
}
/** /**
* This method verifies that the read portions are valid and do not overlap * This method verifies that the read portions are valid and do not overlap
* with each other. * with each other.
@ -1250,7 +1261,7 @@ private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
* 'hedged' read if the first read is taking longer than configured amount of * 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first. * time. We then wait on which ever read returns first.
*/ */
private void hedgedFetchBlockByteRange(long blockStartOffset, long start, private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset, long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
@ -1263,7 +1274,7 @@ private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
ByteBuffer bb = null; ByteBuffer bb = null;
int len = (int) (end - start + 1); int len = (int) (end - start + 1);
int hedgedReadId = 0; int hedgedReadId = 0;
LocatedBlock block = getBlockAt(blockStartOffset); block = refreshLocatedBlock(block);
while (true) { while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops // see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++; hedgedReadOpsLoopNumForTesting++;
@ -1275,7 +1286,7 @@ private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
chosenNode = chooseDataNode(block, ignored); chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len); bb = ByteBuffer.wrap(buf, offset, len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block.getStartOffset(), start, end, bb, chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++); corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable); .submit(getFromDataNodeCallable);
@ -1312,7 +1323,7 @@ private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
} }
bb = ByteBuffer.allocate(len); bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block.getStartOffset(), start, end, bb, chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++); corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = hedgedService Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable); .submit(getFromDataNodeCallable);
@ -1466,12 +1477,11 @@ private int pread(long position, byte[] buffer, int offset, int length)
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try { try {
if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) { if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart, hedgedFetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
} else { } else {
fetchBlockByteRange(blk.getStartOffset(), targetStart, fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
targetStart + bytesToRead - 1, buffer, offset, buffer, offset, corruptedBlockMap);
corruptedBlockMap);
} }
} finally { } finally {
// Check and report if any block replicas are corrupted. // Check and report if any block replicas are corrupted.

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ByteBufferPool;
@ -470,22 +471,17 @@ private int copy(ReaderStrategy strategy, int offset, int length) {
} }
/** /**
* | <--------- LocatedStripedBlock (ID = 0) ---------> | * The super method {@link DFSInputStream#refreshLocatedBlock} refreshes
* LocatedBlock (0) | LocatedBlock (1) | LocatedBlock (2) * cached LocatedBlock by executing {@link DFSInputStream#getBlockAt} again.
* ^ * This method extends the logic by first remembering the index of the
* offset * internal block, and re-parsing the refreshed block group with the same
* On a striped file, the super method {@link DFSInputStream#getBlockAt} * index.
* treats a striped block group as a single {@link LocatedBlock} object,
* which includes target in its range. This method adds the logic of:
* 1. Analyzing the index of required block based on offset
* 2. Parsing the block group to obtain the block location on that index
*/ */
@Override @Override
protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException { protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
LocatedBlock lb = getBlockGroupAt(blkStartOffset); throws IOException {
int idx = BlockIdManager.getBlockIndex(block.getBlock().getLocalBlock());
int idx = (int) ((blkStartOffset - lb.getStartOffset()) LocatedBlock lb = getBlockGroupAt(block.getStartOffset());
% (dataBlkNum + parityBlkNum));
// If indexing information is returned, iterate through the index array // If indexing information is returned, iterate through the index array
// to find the entry for position idx in the group // to find the entry for position idx in the group
LocatedStripedBlock lsb = (LocatedStripedBlock) lb; LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
@ -496,10 +492,11 @@ protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
} }
} }
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("getBlockAt for striped blocks, offset=" DFSClient.LOG.debug("refreshLocatedBlock for striped blocks, offset="
+ blkStartOffset + ". Obtained block " + lb + ", idx=" + idx); + block.getStartOffset() + ". Obtained block " + lb + ", idx=" + idx);
} }
return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize, dataBlkNum, idx); return StripedBlockUtil.constructInternalBlock(
lsb, i, cellSize, dataBlkNum, idx);
} }
private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException { private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
@ -513,12 +510,12 @@ private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
* Real implementation of pread. * Real implementation of pread.
*/ */
@Override @Override
protected void fetchBlockByteRange(long blockStartOffset, long start, protected void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset, long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
// Refresh the striped block group // Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset); LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, cellSize, AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, cellSize,
blockGroup, start, end, buf, offset); blockGroup, start, end, buf, offset);
@ -622,9 +619,9 @@ private void fetchOneStripingChunk(Map<Future<Void>, Integer> futures,
StripingChunk chunk = alignedStripe.chunks[index]; StripingChunk chunk = alignedStripe.chunks[index];
chunk.state = StripingChunk.PENDING; chunk.state = StripingChunk.PENDING;
Callable<Void> readCallable = getFromOneDataNode(dnAddr, Callable<Void> readCallable = getFromOneDataNode(dnAddr,
block.getStartOffset(), alignedStripe.getOffsetInBlock(), block, alignedStripe.getOffsetInBlock(),
alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, chunk.buf, alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1,
chunk.getOffsets(), chunk.getLengths(), chunk.buf, chunk.getOffsets(), chunk.getLengths(),
corruptedBlockMap, index); corruptedBlockMap, index);
Future<Void> getFromDNRequest = service.submit(readCallable); Future<Void> getFromDNRequest = service.submit(readCallable);
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
@ -637,7 +634,7 @@ private void fetchOneStripingChunk(Map<Future<Void>, Integer> futures,
} }
private Callable<Void> getFromOneDataNode(final DNAddrPair datanode, private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
final long blockStartOffset, final long start, final long end, final LocatedBlock block, final long start, final long end,
final byte[] buf, final int[] offsets, final int[] lengths, final byte[] buf, final int[] offsets, final int[] lengths,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) { final int hedgedReadId) {
@ -648,7 +645,7 @@ public Void call() throws Exception {
TraceScope scope = TraceScope scope =
Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan); Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
try { try {
actualGetFromOneDataNode(datanode, blockStartOffset, start, actualGetFromOneDataNode(datanode, block, start,
end, buf, offsets, lengths, corruptedBlockMap); end, buf, offsets, lengths, corruptedBlockMap);
} finally { } finally {
scope.close(); scope.close();

View File

@ -105,16 +105,15 @@ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
final ExtendedBlock blk = constructInternalBlock( final ExtendedBlock blk = constructInternalBlock(
bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup); bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
final long offset = bg.getStartOffset() + idxInBlockGroup * (long) cellSize;
if (idxInReturnedLocs < bg.getLocations().length) { if (idxInReturnedLocs < bg.getLocations().length) {
return new LocatedBlock(blk, return new LocatedBlock(blk,
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
offset, bg.isCorrupt(), null); bg.getStartOffset(), bg.isCorrupt(), null);
} else { } else {
return new LocatedBlock(blk, null, null, null, return new LocatedBlock(blk, null, null, null,
offset, bg.isCorrupt(), null); bg.getStartOffset(), bg.isCorrupt(), null);
} }
} }

View File

@ -87,7 +87,7 @@ public void tearDown() {
* Test {@link DFSStripedInputStream#getBlockAt(long)} * Test {@link DFSStripedInputStream#getBlockAt(long)}
*/ */
@Test @Test
public void testGetBlock() throws Exception { public void testRefreshBlock() throws Exception {
final int numBlocks = 4; final int numBlocks = 4;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false); NUM_STRIPE_PER_BLOCK, false);
@ -102,7 +102,7 @@ public void testGetBlock() throws Exception {
LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb, LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM); CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
for (int j = 0; j < DATA_BLK_NUM; j++) { for (int j = 0; j < DATA_BLK_NUM; j++) {
LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset()); LocatedBlock refreshed = in.refreshLocatedBlock(blks[j]);
assertEquals(blks[j].getBlock(), refreshed.getBlock()); assertEquals(blks[j].getBlock(), refreshed.getBlock());
assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset()); assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
assertArrayEquals(blks[j].getLocations(), refreshed.getLocations()); assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());

View File

@ -59,20 +59,24 @@ public void tearDown() throws IOException {
@Test @Test
public void testWritePreadWithDNFailure1() throws IOException { public void testWritePreadWithDNFailure1() throws IOException {
testWritePreadWithDNFailure("/foo", 0); testWritePreadWithDNFailure("/foo", cellSize * (dataBlocks + 2), 0);
} }
@Test @Test
public void testWritePreadWithDNFailure2() throws IOException { public void testWritePreadWithDNFailure2() throws IOException {
testWritePreadWithDNFailure("/foo", cellSize * 5); testWritePreadWithDNFailure("/foo", cellSize * (dataBlocks + 2), cellSize * 5);
} }
private void testWritePreadWithDNFailure(String file, int startOffsetInFile) @Test
public void testWritePreadWithDNFailure3() throws IOException {
testWritePreadWithDNFailure("/foo", cellSize * dataBlocks, 0);
}
private void testWritePreadWithDNFailure(String file, int fileSize, int startOffsetInFile)
throws IOException { throws IOException {
final int failedDNIdx = 2; final int failedDNIdx = 2;
final int length = cellSize * (dataBlocks + 2);
Path testPath = new Path(file); Path testPath = new Path(file);
final byte[] bytes = StripedFileTestUtil.generateBytes(length); final byte[] bytes = StripedFileTestUtil.generateBytes(fileSize);
DFSTestUtil.writeFile(fs, testPath, bytes); DFSTestUtil.writeFile(fs, testPath, bytes);
// shut down the DN that holds the last internal data block // shut down the DN that holds the last internal data block
@ -89,17 +93,17 @@ private void testWritePreadWithDNFailure(String file, int startOffsetInFile)
// pread // pread
try (FSDataInputStream fsdis = fs.open(testPath)) { try (FSDataInputStream fsdis = fs.open(testPath)) {
byte[] buf = new byte[length]; byte[] buf = new byte[fileSize];
int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length); int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
Assert.assertEquals("The length of file should be the same to write size", Assert.assertEquals("The fileSize of file should be the same to write size",
length - startOffsetInFile, readLen); fileSize - startOffsetInFile, readLen);
byte[] expected = new byte[readLen]; byte[] expected = new byte[readLen];
for (int i = startOffsetInFile; i < length; i++) { for (int i = startOffsetInFile; i < fileSize; i++) {
expected[i - startOffsetInFile] = StripedFileTestUtil.getByte(i); expected[i - startOffsetInFile] = StripedFileTestUtil.getByte(i);
} }
for (int i = startOffsetInFile; i < length; i++) { for (int i = startOffsetInFile; i < fileSize; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", Assert.assertEquals("Byte at " + i + " should be the same",
expected[i - startOffsetInFile], buf[i - startOffsetInFile]); expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
} }