HDFS-8781. Erasure Coding: Correctly handle BlockManager#InvalidateBlocks for striped block. Contributed by Yi Liu.
This commit is contained in:
parent
f8f7a923b7
commit
5956d23b64
@ -367,3 +367,6 @@
|
||||
|
||||
HDFS-8760. Erasure Coding: reuse BlockReader when reading the same block in pread.
|
||||
(jing9)
|
||||
|
||||
HDFS-8781. Erasure Coding: Correctly handle BlockManager#InvalidateBlocks for
|
||||
striped block. (Yi Liu via jing9)
|
||||
|
@ -783,7 +783,10 @@ public LocatedBlock convertLastBlockToUnderConstruction(
|
||||
|
||||
// remove this block from the list of pending blocks to be deleted.
|
||||
for (DatanodeStorageInfo storage : targets) {
|
||||
invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
|
||||
final Block b = getBlockOnStorage(oldBlock, storage);
|
||||
if (b != null) {
|
||||
invalidateBlocks.remove(storage.getDatanodeDescriptor(), b);
|
||||
}
|
||||
}
|
||||
|
||||
// Adjust safe-mode totals, since under-construction blocks don't
|
||||
@ -802,12 +805,14 @@ public LocatedBlock convertLastBlockToUnderConstruction(
|
||||
/**
|
||||
* Get all valid locations of the block
|
||||
*/
|
||||
private List<DatanodeStorageInfo> getValidLocations(Block block) {
|
||||
private List<DatanodeStorageInfo> getValidLocations(BlockInfo block) {
|
||||
final List<DatanodeStorageInfo> locations
|
||||
= new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
|
||||
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
||||
// filter invalidate replicas
|
||||
if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
|
||||
Block b = getBlockOnStorage(block, storage);
|
||||
if(b != null &&
|
||||
!invalidateBlocks.contains(storage.getDatanodeDescriptor(), b)) {
|
||||
locations.add(storage);
|
||||
}
|
||||
}
|
||||
@ -1156,7 +1161,10 @@ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
|
||||
while(it.hasNext()) {
|
||||
BlockInfo block = it.next();
|
||||
removeStoredBlock(block, node);
|
||||
invalidateBlocks.remove(node, block);
|
||||
final Block b = getBlockOnStorage(block, storageInfo);
|
||||
if (b != null) {
|
||||
invalidateBlocks.remove(node, b);
|
||||
}
|
||||
}
|
||||
namesystem.checkSafeMode();
|
||||
}
|
||||
@ -1184,7 +1192,7 @@ private void addToInvalidates(BlockInfo storedBlock) {
|
||||
for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
|
||||
State.NORMAL)) {
|
||||
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
||||
final Block b = getBlockToInvalidate(storedBlock, storage);
|
||||
final Block b = getBlockOnStorage(storedBlock, storage);
|
||||
if (b != null) {
|
||||
invalidateBlocks.add(b, node, false);
|
||||
datanodes.append(node).append(" ");
|
||||
@ -1196,7 +1204,7 @@ private void addToInvalidates(BlockInfo storedBlock) {
|
||||
}
|
||||
}
|
||||
|
||||
private Block getBlockToInvalidate(BlockInfo storedBlock,
|
||||
private Block getBlockOnStorage(BlockInfo storedBlock,
|
||||
DatanodeStorageInfo storage) {
|
||||
return storedBlock.isStriped() ?
|
||||
((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock;
|
||||
@ -2054,7 +2062,10 @@ private void removeZombieReplicas(BlockReportContext context,
|
||||
// more than one storage on a datanode (and because it's a difficult
|
||||
// assumption to really enforce)
|
||||
removeStoredBlock(block, zombie.getDatanodeDescriptor());
|
||||
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
|
||||
Block b = getBlockOnStorage(block, zombie);
|
||||
if (b != null) {
|
||||
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b);
|
||||
}
|
||||
}
|
||||
assert(zombie.numBlocks() == 0);
|
||||
LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
|
||||
@ -3273,7 +3284,7 @@ private void processChosenExcessReplica(
|
||||
// should be deleted. Items are removed from the invalidate list
|
||||
// upon giving instructions to the datanodes.
|
||||
//
|
||||
final Block blockToInvalidate = getBlockToInvalidate(storedBlock, chosen);
|
||||
final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
|
||||
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
|
||||
blockLog.info("BLOCK* chooseExcessReplicates: "
|
||||
+"({}, {}) is added to invalidated blocks set", chosen, storedBlock);
|
||||
@ -3838,6 +3849,12 @@ private int invalidateWorkForOneNode(DatanodeInfo dn) {
|
||||
return toInvalidate.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public boolean containsInvalidateBlock(final DatanodeInfo dn,
|
||||
final Block block) {
|
||||
return invalidateBlocks.contains(dn, block);
|
||||
}
|
||||
|
||||
boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) {
|
||||
if (!this.shouldCheckForEnoughRacks) {
|
||||
return true;
|
||||
|
@ -694,6 +694,13 @@ public Block[] getInvalidateBlocks(int maxblocks) {
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public boolean containsInvalidateBlock(Block block) {
|
||||
synchronized (invalidateBlocks) {
|
||||
return invalidateBlocks.contains(block);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Approximate number of blocks currently scheduled to be written
|
||||
*/
|
||||
|
@ -22,13 +22,16 @@
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
@ -274,28 +277,68 @@ public void testReportBadBlock() throws IOException {
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
||||
}
|
||||
|
||||
// do stateful read
|
||||
ByteBuffer result = ByteBuffer.allocate(length);
|
||||
ByteBuffer buf = ByteBuffer.allocate(1024);
|
||||
int readLen = 0;
|
||||
int ret;
|
||||
try (FSDataInputStream in = fs.open(file)) {
|
||||
while ((ret = in.read(buf)) >= 0) {
|
||||
readLen += ret;
|
||||
buf.flip();
|
||||
result.put(buf);
|
||||
buf.clear();
|
||||
try {
|
||||
// do stateful read
|
||||
ByteBuffer result = ByteBuffer.allocate(length);
|
||||
ByteBuffer buf = ByteBuffer.allocate(1024);
|
||||
int readLen = 0;
|
||||
int ret;
|
||||
try (FSDataInputStream in = fs.open(file)) {
|
||||
while ((ret = in.read(buf)) >= 0) {
|
||||
readLen += ret;
|
||||
buf.flip();
|
||||
result.put(buf);
|
||||
buf.clear();
|
||||
}
|
||||
}
|
||||
Assert.assertEquals("The length of file should be the same to write size",
|
||||
length, readLen);
|
||||
Assert.assertArrayEquals(bytes, result.array());
|
||||
|
||||
// check whether the corruption has been reported to the NameNode
|
||||
final FSNamesystem ns = cluster.getNamesystem();
|
||||
final BlockManager bm = ns.getBlockManager();
|
||||
BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
|
||||
.asFile().getBlocks())[0];
|
||||
Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
|
||||
} finally {
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
||||
}
|
||||
}
|
||||
Assert.assertEquals("The length of file should be the same to write size",
|
||||
length, readLen);
|
||||
Assert.assertArrayEquals(bytes, result.array());
|
||||
}
|
||||
|
||||
// check whether the corruption has been reported to the NameNode
|
||||
final FSNamesystem ns = cluster.getNamesystem();
|
||||
final BlockManager bm = ns.getBlockManager();
|
||||
BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
|
||||
.asFile().getBlocks())[0];
|
||||
Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
|
||||
@Test
|
||||
public void testInvalidateBlock() throws IOException {
|
||||
final Path file = new Path("/invalidate");
|
||||
final int length = 10;
|
||||
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
|
||||
DFSTestUtil.writeFile(fs, file, bytes);
|
||||
|
||||
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
|
||||
Assert.assertNotEquals(-1, dnIndex);
|
||||
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
|
||||
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
|
||||
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
|
||||
cellSize, dataBlocks, parityBlocks);
|
||||
final Block b = blks[0].getBlock().getLocalBlock();
|
||||
|
||||
DataNode dn = cluster.getDataNodes().get(dnIndex);
|
||||
// disable the heartbeat from DN so that the invalidated block record is kept
|
||||
// in NameNode until heartbeat expires and NN mark the dn as dead
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
||||
|
||||
try {
|
||||
// delete the file
|
||||
fs.delete(file, true);
|
||||
// check the block is added to invalidateBlocks
|
||||
final FSNamesystem fsn = cluster.getNamesystem();
|
||||
final BlockManager bm = fsn.getBlockManager();
|
||||
DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
|
||||
Assert.assertTrue(bm.containsInvalidateBlock(
|
||||
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
|
||||
} finally {
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user