HDFS-15187. CORRUPT replica mismatch between namenodes after failover. Contributed by Ayush Saxena.

This commit is contained in:
Ayush Saxena 2020-02-24 20:38:04 +05:30
parent 93b8f453b9
commit 7f8685f476
2 changed files with 104 additions and 9 deletions

View File

@ -3142,6 +3142,7 @@ public void processQueuedMessagesForBlock(Block b) throws IOException {
private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis)
throws IOException {
boolean isPreviousMessageProcessed = true;
for (ReportedBlockInfo rbi : rbis) {
LOG.debug("Processing previouly queued message {}", rbi);
if (rbi.getReportedState() == null) {
@ -3149,9 +3150,15 @@ private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis)
DatanodeStorageInfo storageInfo = rbi.getStorageInfo();
removeStoredBlock(getStoredBlock(rbi.getBlock()),
storageInfo.getDatanodeDescriptor());
} else if (!isPreviousMessageProcessed) {
// if the previous IBR processing was skipped, skip processing all
// further IBR's so as to ensure same sequence of processing.
queueReportedBlock(rbi.getStorageInfo(), rbi.getBlock(),
rbi.getReportedState(), QUEUE_REASON_FUTURE_GENSTAMP);
} else {
processAndHandleReportedBlock(rbi.getStorageInfo(),
rbi.getBlock(), rbi.getReportedState(), null);
isPreviousMessageProcessed =
processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(),
rbi.getReportedState(), null);
}
}
}
@ -4098,8 +4105,14 @@ public void addBlock(DatanodeStorageInfo storageInfo, Block block,
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
}
private void processAndHandleReportedBlock(
/**
* Process a reported block.
* @return true if the block is processed, or false if the block is queued
* to be processed later.
* @throws IOException
*/
private boolean processAndHandleReportedBlock(
DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
@ -4113,7 +4126,7 @@ private void processAndHandleReportedBlock(
isGenStampInFuture(block)) {
queueReportedBlock(storageInfo, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
return;
return false;
}
// find block by blockId
@ -4124,7 +4137,7 @@ private void processAndHandleReportedBlock(
blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
"belong to any file", block, node, block.getNumBytes());
addToInvalidates(new Block(block), node);
return;
return true;
}
BlockUCState ucState = storedBlock.getBlockUCState();
@ -4133,7 +4146,7 @@ private void processAndHandleReportedBlock(
// Ignore replicas already scheduled to be removed from the DN
if(invalidateBlocks.contains(node, block)) {
return;
return true;
}
BlockToMarkCorrupt c = checkReplicaCorrupt(
@ -4151,14 +4164,14 @@ private void processAndHandleReportedBlock(
} else {
markBlockAsCorrupt(c, storageInfo, node);
}
return;
return true;
}
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
addStoredBlockUnderConstruction(
new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
storageInfo);
return;
return true;
}
// Add replica if appropriate. If the replica was previously corrupt
@ -4168,6 +4181,7 @@ private void processAndHandleReportedBlock(
corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
}
return true;
}
/**

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
/**
* Tests corruption of replicas in case of failover.
*/
public class TestCorruptionWithFailover {
@Test
public void testCorruptReplicaAfterFailover() throws Exception {
Configuration conf = new Configuration();
// Enable data to be written, to less replicas in case of pipeline failure.
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
MIN_REPLICATION, 2);
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3)
.build()) {
cluster.transitionToActive(0);
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem(0);
FSDataOutputStream out = dfs.create(new Path("/dir/file"));
// Write some data and flush.
for (int i = 0; i < 1024 * 1024; i++) {
out.write(i);
}
out.hsync();
// Stop one datanode, so as to trigger update pipeline.
MiniDFSCluster.DataNodeProperties dn = cluster.stopDataNode(0);
// Write some more data and close the file.
for (int i = 0; i < 1024 * 1024; i++) {
out.write(i);
}
out.close();
BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
BlockManager bm1 = cluster.getNamesystem(1).getBlockManager();
// Mark datanodes as stale, as are marked if a namenode went through a
// failover, to prevent replica deletion.
bm0.getDatanodeManager().markAllDatanodesStale();
bm1.getDatanodeManager().markAllDatanodesStale();
// Restart the datanode
cluster.restartDataNode(dn);
// The replica from the datanode will be having lesser genstamp, so
// would be marked as CORRUPT.
GenericTestUtils.waitFor(() -> bm0.getCorruptBlocks() == 1, 100, 10000);
// Perform failover to other namenode
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.waitActive(1);
// The corrupt count should be same as first namenode.
GenericTestUtils.waitFor(() -> bm1.getCorruptBlocks() == 1, 100, 10000);
}
}
}