HDFS-8619. Erasure Coding: revisit replica counting for striped blocks. (Jing Zhao via yliu)

This commit is contained in:
yliu 2015-07-15 22:35:19 +08:00
parent 0a93712f3b
commit f32d9a1758
8 changed files with 127 additions and 28 deletions

View File

@ -353,3 +353,6 @@
HDFS-8734. Erasure Coding: fix one cell need two packets. (Walter Su via
jing9)
HDFS-8619. Erasure Coding: revisit replica counting for striped blocks.
(Jing Zhao via yliu)

View File

@ -178,6 +178,9 @@ public int getCapacity() {
public abstract boolean isStriped();
/** @return true if there is no datanode storage associated with the block */
abstract boolean hasNoStorage();
/**
* Find specified DatanodeDescriptor.
* @return index or -1 if not found.

View File

@ -150,4 +150,9 @@ public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
public final boolean isStriped() {
return false;
}
@Override
final boolean hasNoStorage() {
return getStorageInfo(0) == null;
}
}

View File

@ -272,4 +272,15 @@ public BlockInfoStripedUnderConstruction convertToBlockUnderConstruction(
}
return ucBlock;
}
@Override
final boolean hasNoStorage() {
final int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
if (getStorageInfo(idx) != null) {
return false;
}
}
return true;
}
}

View File

@ -1235,10 +1235,11 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
}
/**
*
* @param b
* Mark a replica (of a contiguous block) or an internal block (of a striped
* block group) as corrupt.
* @param b Indicating the reported bad block and the corresponding BlockInfo
* stored in blocksMap.
* @param storageInfo storage that contains the block, if known. null otherwise.
* @throws IOException
*/
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeStorageInfo storageInfo,
@ -1258,8 +1259,13 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
storageInfo.addBlock(b.stored, b.corrupted);
}
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
// Add this replica to corruptReplicas Map. For striped blocks, we always
// use the id of whole striped block group when adding to corruptReplicas
Block corrupted = new Block(b.corrupted);
if (b.stored.isStriped()) {
corrupted.setBlockId(b.stored.getBlockId());
}
corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.reason,
b.reasonCode);
NumberReplicas numberOfReplicas = countNodes(b.stored);
@ -1283,7 +1289,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|| corruptedDuringWrite) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node);
invalidateBlock(b, node, numberOfReplicas);
} else if (namesystem.isPopulatingReplQueues()) {
// add the block to neededReplication
updateNeededReplications(b.stored, -1, 0);
@ -1295,8 +1301,8 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
* @return true if the block was successfully invalidated and no longer
* present in the BlocksMap
*/
private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
) throws IOException {
private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
NumberReplicas nr) throws IOException {
blockLog.info("BLOCK* invalidateBlock: {} on {}", b, dn);
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
@ -1305,7 +1311,6 @@ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
}
// Check how many copies we have of the block
NumberReplicas nr = countNodes(b.stored);
if (nr.replicasOnStaleNodes() > 0) {
blockLog.info("BLOCK* invalidateBlocks: postponing " +
"invalidation of {} on {} because {} replica(s) are located on " +
@ -1313,17 +1318,14 @@ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
nr.replicasOnStaleNodes());
postponeBlock(b.corrupted);
return false;
} else if (nr.liveReplicas() >= 1) {
// If we have at least one copy on a live node, then we can delete it.
} else {
// we already checked the number of replicas in the caller of this
// function and know there are enough live replicas, so we can delete it.
addToInvalidates(b.corrupted, dn);
removeStoredBlock(b.stored, node);
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
b, dn);
return true;
} else {
blockLog.info("BLOCK* invalidateBlocks: {} on {} is the only copy and" +
" was not deleted", b, dn);
return false;
}
}
@ -2782,7 +2784,7 @@ private Block addStoredBlock(final BlockInfo block,
" but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
invalidateCorruptReplicas(storedBlock, reportedBlock);
invalidateCorruptReplicas(storedBlock, reportedBlock, num);
}
return storedBlock;
}
@ -2814,18 +2816,20 @@ private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
*
* @param blk Block whose corrupt replicas need to be invalidated
*/
private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
private void invalidateCorruptReplicas(BlockInfo blk, Block reported,
NumberReplicas numberReplicas) {
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
boolean removedFromBlocksMap = true;
if (nodes == null)
return;
// make a copy of the array of nodes in order to avoid
// ConcurrentModificationException, when the block is removed from the node
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
DatanodeDescriptor[] nodesCopy =
nodes.toArray(new DatanodeDescriptor[nodes.size()]);
for (DatanodeDescriptor node : nodesCopy) {
try {
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
Reason.ANY), node)) {
Reason.ANY), node, numberReplicas)) {
removedFromBlocksMap = false;
}
} catch (IOException e) {

View File

@ -201,8 +201,8 @@ boolean removeNode(Block b, DatanodeDescriptor node) {
// remove block from the data-node list and the node from the block info
boolean removed = node.removeBlock(info);
if (info.getDatanode(0) == null // no datanodes left
&& info.isDeleted()) { // does not belong to a file
if (info.hasNoStorage() // no datanodes left
&& info.isDeleted()) { // does not belong to a file
blocks.remove(b); // remove block from the map
}
return removed;

View File

@ -24,7 +24,11 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Assert;
@ -234,4 +238,64 @@ private void testReadWithDNFailure(String file, int fileSize,
fileSize, readLen);
Assert.assertArrayEquals(bytes, result.array());
}
/**
* After reading a corrupted block, make sure the client can correctly report
* the corruption to the NameNode.
*/
@Test
public void testReportBadBlock() throws IOException {
// create file
final Path file = new Path("/corrupted");
final int length = 10; // length of "corruption"
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
// corrupt the first data block
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
Assert.assertNotEquals(-1, dnIndex);
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
cellSize, dataBlocks, parityBlocks);
// find the first block file
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
// corrupt the block file
LOG.info("Deliberately corrupting file " + blkFile.getName());
try (FileOutputStream out = new FileOutputStream(blkFile)) {
out.write("corruption".getBytes());
}
// disable the heartbeat from DN so that the corrupted block record is kept
// in NameNode
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
// do stateful read
ByteBuffer result = ByteBuffer.allocate(length);
ByteBuffer buf = ByteBuffer.allocate(1024);
int readLen = 0;
int ret;
try (FSDataInputStream in = fs.open(file)) {
while ((ret = in.read(buf)) >= 0) {
readLen += ret;
buf.flip();
result.put(buf);
buf.clear();
}
}
Assert.assertEquals("The length of file should be the same to write size",
length, readLen);
Assert.assertArrayEquals(bytes, result.array());
// check whether the corruption has been reported to the NameNode
final FSNamesystem ns = cluster.getNamesystem();
final BlockManager bm = ns.getBlockManager();
BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
.asFile().getBlocks())[0];
Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
}
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -325,6 +326,7 @@ public void testCheckStripedReplicaCorrupt() throws Exception {
final int numStripes = 4;
final Path filePath = new Path("/corrupt");
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
final BlockManager bm = ns.getBlockManager();
DFSTestUtil.createStripedFile(cluster, filePath, null,
numBlocks, numStripes, false);
@ -375,7 +377,10 @@ public void testCheckStripedReplicaCorrupt() throws Exception {
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(3).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(2, ns.getCorruptReplicaBlocks());
// the total number of corrupted block info is still 1
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
// 2 internal blocks corrupted
Assert.assertEquals(2, bm.getCorruptReplicas(stored).size());
// Now change the size of stored block, and test verifying the last
// block size
@ -385,9 +390,10 @@ public void testCheckStripedReplicaCorrupt() throws Exception {
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(3).getDatanodeId(), reports[0]);
cluster.getDataNodes().get(4).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
Assert.assertEquals(3, bm.getCorruptReplicas(stored).size());
// Now send a parity block report with correct size based on adjusted
// size of stored block
@ -400,16 +406,18 @@ public void testCheckStripedReplicaCorrupt() throws Exception {
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(0).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
Assert.assertEquals(3, bm.getCorruptReplicas(stored).size());
reported.setBlockId(stored.getBlockId() + 1);
reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 10);
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(1).getDatanodeId(), reports[0]);
cluster.getDataNodes().get(5).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
Assert.assertEquals(3, bm.getCorruptReplicas(stored).size());
reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS);
reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE);
@ -418,7 +426,8 @@ public void testCheckStripedReplicaCorrupt() throws Exception {
ns.processIncrementalBlockReport(
cluster.getDataNodes().get(2).getDatanodeId(), reports[0]);
BlockManagerTestUtil.updateState(ns.getBlockManager());
Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
Assert.assertEquals(3, bm.getCorruptReplicas(stored).size());
}
}