diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 29339141b8..9133515afb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3751,9 +3751,24 @@ private void invalidateCorruptReplicas(BlockInfo blk, Block reported, // ConcurrentModificationException, when the block is removed from the node DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[nodes.size()]); + + DatanodeStorageInfo[] storages = null; + if (blk.isStriped()) { + storages = getStorages(blk); + } + for (DatanodeDescriptor node : nodesCopy) { + Block blockToInvalidate = reported; + if (storages != null && blk.isStriped()) { + for (DatanodeStorageInfo s : storages) { + if (s.getDatanodeDescriptor().equals(node)) { + blockToInvalidate = getBlockOnStorage(blk, s); + break; + } + } + } try { - if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null, + if (!invalidateBlock(new BlockToMarkCorrupt(blockToInvalidate, blk, null, Reason.ANY), node, numberReplicas)) { removedFromBlocksMap = false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java index e1497445b9..4051dbd562 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java @@ -73,11 +73,11 @@ abstract public class ReadStripedFileWithDecodingHelper { public static MiniDFSCluster initializeCluster() throws IOException { Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, - 0); + 2); MiniDFSCluster myCluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(NUM_DATANODES) + .numDataNodes(NUM_DATANODES + 3) .build(); myCluster.getFileSystem().enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); @@ -108,6 +108,22 @@ public static int findFirstDataNode(MiniDFSCluster cluster, return -1; } + // The index begins from 1. + public static int findDataNodeAtIndex(MiniDFSCluster cluster, + DistributedFileSystem dfs, Path file, long length, int index) throws IOException { + BlockLocation[] locs = dfs.getFileBlockLocations(file, 0, length); + String name = (locs[0].getNames())[index - 1]; + int dnIndex = 0; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + return dnIndex; + } + dnIndex++; + } + return -1; + } + /** * Cross product of FILE_LENGTHS, NUM_PARITY_UNITS+1, NUM_PARITY_UNITS. * Input for parameterized tests classes. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java index f80cb01bab..62e732cbbf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,6 +51,7 @@ import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE; import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS; import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findDataNodeAtIndex; import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode; import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster; import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster; @@ -96,7 +98,7 @@ public void testReportBadBlock() throws IOException { .get(0); final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS); - // find the first block file + // Find the first block file. File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); Assert.assertTrue("Block file does not exist", blkFile.exists()); @@ -169,6 +171,129 @@ public void testInvalidateBlock() throws IOException, InterruptedException { } } + /** + * This unit test try to cover the below situation: + * Suppose we have an EC file with RS(d,p) policy and block group id + * is blk_-9223372036845119810_1920002. + * If the first and second data block in this ec block group are corrupted, + * meanwhile we read this EC file. + * It will trigger reportBadBlock RPC and + * add the blk_-9223372036845119810_1920002 + * and blk_-9223372036845119809_1920002 blocks to corruptReplicas. + * It will also reconstruct the two blocks and send IBR to namenode, + * then execute BlockManager#addStoredBlock and + * invalidateCorruptReplicas method. Suppose we first receive the IBR of + * blk_-9223372036845119810_1920002, then in invalidateCorruptReplicas method, + * it will only invalidate 9223372036845119809_1920002 on the two datanodes contains + * the two corrupt blocks. + * + * @throws Exception + */ + @Test + public void testCorruptionECBlockInvalidate() throws Exception { + + final Path file = new Path("/invalidate_corrupted"); + final int length = BLOCK_SIZE * NUM_DATA_UNITS; + final byte[] bytes = StripedFileTestUtil.generateBytes(length); + DFSTestUtil.writeFile(dfs, file, bytes); + + int dnIndex = findFirstDataNode(cluster, dfs, file, + CELL_SIZE * NUM_DATA_UNITS); + int dnIndex2 = findDataNodeAtIndex(cluster, dfs, file, + CELL_SIZE * NUM_DATA_UNITS, 2); + Assert.assertNotEquals(-1, dnIndex); + Assert.assertNotEquals(-1, dnIndex2); + + LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient() + .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS) + .get(0); + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, + CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS); + + final Block b = blks[0].getBlock().getLocalBlock(); + final Block b2 = blks[1].getBlock().getLocalBlock(); + + // Find the first block file. + File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); + File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); + Assert.assertTrue("Block file does not exist", blkFile.exists()); + // Corrupt the block file. + LOG.info("Deliberately corrupting file " + blkFile.getName()); + try (FileOutputStream out = new FileOutputStream(blkFile)) { + out.write("corruption".getBytes()); + out.flush(); + } + + // Find the second block file. + File storageDir2 = cluster.getInstanceStorageDir(dnIndex2, 0); + File blkFile2 = MiniDFSCluster.getBlockFile(storageDir2, blks[1].getBlock()); + Assert.assertTrue("Block file does not exist", blkFile2.exists()); + // Corrupt the second block file. + LOG.info("Deliberately corrupting file " + blkFile2.getName()); + try (FileOutputStream out = new FileOutputStream(blkFile2)) { + out.write("corruption".getBytes()); + out.flush(); + } + + // Disable the heartbeat from DN so that the corrupted block record is kept + // in NameNode. + for (DataNode dataNode : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true); + } + try { + // Do stateful read. + StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes, + ByteBuffer.allocate(1024)); + + // Check whether the corruption has been reported to the NameNode. + final FSNamesystem ns = cluster.getNamesystem(); + final BlockManager bm = ns.getBlockManager(); + BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString()) + .asFile().getBlocks())[0]; + GenericTestUtils.waitFor(() -> { + if (bm.getCorruptReplicas(blockInfo) == null) { + return false; + } + return bm.getCorruptReplicas(blockInfo).size() == 2; + }, 250, 60000); + // Double check. + Assert.assertEquals(2, bm.getCorruptReplicas(blockInfo).size()); + + DatanodeDescriptor dnd = + NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex).getDatanodeId()); + + DatanodeDescriptor dnd2 = + NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex2).getDatanodeId()); + + for (DataNode datanode : cluster.getDataNodes()) { + if (!datanode.getDatanodeUuid().equals(dnd.getDatanodeUuid()) && + !datanode.getDatanodeUuid().equals(dnd2.getDatanodeUuid())) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false); + } + } + + GenericTestUtils.waitFor(() -> { + return bm.containsInvalidateBlock( + blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b); + }, 250, 60000); + Assert.assertTrue(bm.containsInvalidateBlock( + blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b)); + + GenericTestUtils.waitFor(() -> { + return bm.containsInvalidateBlock( + blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2); + }, 250, 60000); + + Assert.assertTrue(bm.containsInvalidateBlock( + blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2)); + + } finally { + for (DataNode datanode : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false); + } + } + } + @Test public void testMoreThanOneCorruptedBlock() throws IOException { final Path file = new Path("/corrupted");