From 7f8685f4760f1358bb30927a7da9a5041e8c39e1 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 24 Feb 2020 20:38:04 +0530 Subject: [PATCH] HDFS-15187. CORRUPT replica mismatch between namenodes after failover. Contributed by Ayush Saxena. --- .../server/blockmanagement/BlockManager.java | 32 +++++--- .../TestCorruptionWithFailover.java | 81 +++++++++++++++++++ 2 files changed, 104 insertions(+), 9 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index aaa4aeb538..626048f1cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3142,6 +3142,7 @@ public void processQueuedMessagesForBlock(Block b) throws IOException { private void processQueuedMessages(Iterable rbis) throws IOException { + boolean isPreviousMessageProcessed = true; for (ReportedBlockInfo rbi : rbis) { LOG.debug("Processing previouly queued message {}", rbi); if (rbi.getReportedState() == null) { @@ -3149,9 +3150,15 @@ private void processQueuedMessages(Iterable rbis) DatanodeStorageInfo storageInfo = rbi.getStorageInfo(); removeStoredBlock(getStoredBlock(rbi.getBlock()), storageInfo.getDatanodeDescriptor()); + } else if (!isPreviousMessageProcessed) { + // if the previous IBR processing was skipped, skip processing all + // further IBR's so as to ensure same sequence of processing. + queueReportedBlock(rbi.getStorageInfo(), rbi.getBlock(), + rbi.getReportedState(), QUEUE_REASON_FUTURE_GENSTAMP); } else { - processAndHandleReportedBlock(rbi.getStorageInfo(), - rbi.getBlock(), rbi.getReportedState(), null); + isPreviousMessageProcessed = + processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(), + rbi.getReportedState(), null); } } } @@ -4098,8 +4105,14 @@ public void addBlock(DatanodeStorageInfo storageInfo, Block block, processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } - - private void processAndHandleReportedBlock( + + /** + * Process a reported block. + * @return true if the block is processed, or false if the block is queued + * to be processed later. + * @throws IOException + */ + private boolean processAndHandleReportedBlock( DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { @@ -4113,7 +4126,7 @@ private void processAndHandleReportedBlock( isGenStampInFuture(block)) { queueReportedBlock(storageInfo, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); - return; + return false; } // find block by blockId @@ -4124,7 +4137,7 @@ private void processAndHandleReportedBlock( blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " + "belong to any file", block, node, block.getNumBytes()); addToInvalidates(new Block(block), node); - return; + return true; } BlockUCState ucState = storedBlock.getBlockUCState(); @@ -4133,7 +4146,7 @@ private void processAndHandleReportedBlock( // Ignore replicas already scheduled to be removed from the DN if(invalidateBlocks.contains(node, block)) { - return; + return true; } BlockToMarkCorrupt c = checkReplicaCorrupt( @@ -4151,14 +4164,14 @@ private void processAndHandleReportedBlock( } else { markBlockAsCorrupt(c, storageInfo, node); } - return; + return true; } if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { addStoredBlockUnderConstruction( new StatefulBlockInfo(storedBlock, new Block(block), reportedState), storageInfo); - return; + return true; } // Add replica if appropriate. If the replica was previously corrupt @@ -4168,6 +4181,7 @@ private void processAndHandleReportedBlock( corruptReplicas.isReplicaCorrupt(storedBlock, node))) { addStoredBlock(storedBlock, block, storageInfo, delHintNode, true); } + return true; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java new file mode 100644 index 0000000000..f5899c0f65 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Test; + +/** + * Tests corruption of replicas in case of failover. + */ +public class TestCorruptionWithFailover { + + @Test + public void testCorruptReplicaAfterFailover() throws Exception { + Configuration conf = new Configuration(); + // Enable data to be written, to less replicas in case of pipeline failure. + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, 2); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3) + .build()) { + cluster.transitionToActive(0); + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(0); + FSDataOutputStream out = dfs.create(new Path("/dir/file")); + // Write some data and flush. + for (int i = 0; i < 1024 * 1024; i++) { + out.write(i); + } + out.hsync(); + // Stop one datanode, so as to trigger update pipeline. + MiniDFSCluster.DataNodeProperties dn = cluster.stopDataNode(0); + // Write some more data and close the file. + for (int i = 0; i < 1024 * 1024; i++) { + out.write(i); + } + out.close(); + BlockManager bm0 = cluster.getNamesystem(0).getBlockManager(); + BlockManager bm1 = cluster.getNamesystem(1).getBlockManager(); + // Mark datanodes as stale, as are marked if a namenode went through a + // failover, to prevent replica deletion. + bm0.getDatanodeManager().markAllDatanodesStale(); + bm1.getDatanodeManager().markAllDatanodesStale(); + // Restart the datanode + cluster.restartDataNode(dn); + // The replica from the datanode will be having lesser genstamp, so + // would be marked as CORRUPT. + GenericTestUtils.waitFor(() -> bm0.getCorruptBlocks() == 1, 100, 10000); + + // Perform failover to other namenode + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + cluster.waitActive(1); + // The corrupt count should be same as first namenode. + GenericTestUtils.waitFor(() -> bm1.getCorruptBlocks() == 1, 100, 10000); + } + } +} +