HDFS-11445. FSCK shows overall health stauts as corrupt even one replica is corrupt. Contributed by Brahma Reddy Battula.
This commit is contained in:
parent
8bf0e2d6b3
commit
2e41f8803d
@ -27,7 +27,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
||||||
import org.apache.hadoop.util.LightWeightGSet;
|
import org.apache.hadoop.util.LightWeightGSet;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
|
import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
|
||||||
@ -286,28 +285,25 @@ assert getBlockUCState() != BlockUCState.COMPLETE :
|
|||||||
* Process the recorded replicas. When about to commit or finish the
|
* Process the recorded replicas. When about to commit or finish the
|
||||||
* pipeline recovery sort out bad replicas.
|
* pipeline recovery sort out bad replicas.
|
||||||
* @param genStamp The final generation stamp for the block.
|
* @param genStamp The final generation stamp for the block.
|
||||||
|
* @return staleReplica's List.
|
||||||
*/
|
*/
|
||||||
public void setGenerationStampAndVerifyReplicas(long genStamp) {
|
public List<ReplicaUnderConstruction> setGenerationStampAndVerifyReplicas(
|
||||||
|
long genStamp) {
|
||||||
Preconditions.checkState(uc != null && !isComplete());
|
Preconditions.checkState(uc != null && !isComplete());
|
||||||
// Set the generation stamp for the block.
|
// Set the generation stamp for the block.
|
||||||
setGenerationStamp(genStamp);
|
setGenerationStamp(genStamp);
|
||||||
|
|
||||||
// Remove the replicas with wrong gen stamp
|
return uc.getStaleReplicas(genStamp);
|
||||||
List<ReplicaUnderConstruction> staleReplicas = uc.getStaleReplicas(genStamp);
|
|
||||||
for (ReplicaUnderConstruction r : staleReplicas) {
|
|
||||||
r.getExpectedStorageLocation().removeBlock(this);
|
|
||||||
NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica {}"
|
|
||||||
+ " of {}", r, Block.toString(r));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit block's length and generation stamp as reported by the client.
|
* Commit block's length and generation stamp as reported by the client.
|
||||||
* Set block state to {@link BlockUCState#COMMITTED}.
|
* Set block state to {@link BlockUCState#COMMITTED}.
|
||||||
* @param block - contains client reported block length and generation
|
* @param block - contains client reported block length and generation
|
||||||
|
* @return staleReplica's List.
|
||||||
* @throws IOException if block ids are inconsistent.
|
* @throws IOException if block ids are inconsistent.
|
||||||
*/
|
*/
|
||||||
void commitBlock(Block block) throws IOException {
|
List<ReplicaUnderConstruction> commitBlock(Block block) throws IOException {
|
||||||
if (getBlockId() != block.getBlockId()) {
|
if (getBlockId() != block.getBlockId()) {
|
||||||
throw new IOException("Trying to commit inconsistent block: id = "
|
throw new IOException("Trying to commit inconsistent block: id = "
|
||||||
+ block.getBlockId() + ", expected id = " + getBlockId());
|
+ block.getBlockId() + ", expected id = " + getBlockId());
|
||||||
@ -316,6 +312,6 @@ void commitBlock(Block block) throws IOException {
|
|||||||
uc.commit();
|
uc.commit();
|
||||||
this.setNumBytes(block.getNumBytes());
|
this.setNumBytes(block.getNumBytes());
|
||||||
// Sort out invalid replicas.
|
// Sort out invalid replicas.
|
||||||
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
|
return setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -848,7 +848,7 @@ public boolean hasMinStorage(BlockInfo block, int liveNum) {
|
|||||||
* @throws IOException if the block does not have at least a minimal number
|
* @throws IOException if the block does not have at least a minimal number
|
||||||
* of replicas reported from data-nodes.
|
* of replicas reported from data-nodes.
|
||||||
*/
|
*/
|
||||||
private static boolean commitBlock(final BlockInfo block,
|
private boolean commitBlock(final BlockInfo block,
|
||||||
final Block commitBlock) throws IOException {
|
final Block commitBlock) throws IOException {
|
||||||
if (block.getBlockUCState() == BlockUCState.COMMITTED)
|
if (block.getBlockUCState() == BlockUCState.COMMITTED)
|
||||||
return false;
|
return false;
|
||||||
@ -859,7 +859,9 @@ private static boolean commitBlock(final BlockInfo block,
|
|||||||
throw new IOException("Commit block with mismatching GS. NN has " +
|
throw new IOException("Commit block with mismatching GS. NN has " +
|
||||||
block + ", client submits " + commitBlock);
|
block + ", client submits " + commitBlock);
|
||||||
}
|
}
|
||||||
block.commitBlock(commitBlock);
|
List<ReplicaUnderConstruction> staleReplicas =
|
||||||
|
block.commitBlock(commitBlock);
|
||||||
|
removeStaleReplicas(staleReplicas, block);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -993,7 +995,8 @@ private void convertToCompleteBlock(BlockInfo curBlock, INodesInPath iip)
|
|||||||
* when tailing edit logs as a Standby.
|
* when tailing edit logs as a Standby.
|
||||||
*/
|
*/
|
||||||
public void forceCompleteBlock(final BlockInfo block) throws IOException {
|
public void forceCompleteBlock(final BlockInfo block) throws IOException {
|
||||||
block.commitBlock(block);
|
List<ReplicaUnderConstruction> staleReplicas = block.commitBlock(block);
|
||||||
|
removeStaleReplicas(staleReplicas, block);
|
||||||
completeBlock(block, null, true);
|
completeBlock(block, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3638,6 +3641,16 @@ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeStaleReplicas(List<ReplicaUnderConstruction> staleReplicas,
|
||||||
|
BlockInfo block) {
|
||||||
|
for (ReplicaUnderConstruction r : staleReplicas) {
|
||||||
|
removeStoredBlock(block,
|
||||||
|
r.getExpectedStorageLocation().getDatanodeDescriptor());
|
||||||
|
NameNode.blockStateChangeLog
|
||||||
|
.debug("BLOCK* Removing stale replica {}" + " of {}", r,
|
||||||
|
Block.toString(r));
|
||||||
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Get all valid locations of the block & add the block to results
|
* Get all valid locations of the block & add the block to results
|
||||||
* @return the length of the added block; 0 if the block is not added. If the
|
* @return the length of the added block; 0 if the block is not added. If the
|
||||||
@ -4090,6 +4103,13 @@ public BlockInfo getStoredBlock(Block block) {
|
|||||||
new Block(BlockIdManager.convertToStripedID(block.getBlockId())));
|
new Block(BlockIdManager.convertToStripedID(block.getBlockId())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateLastBlock(BlockInfo lastBlock, ExtendedBlock newBlock) {
|
||||||
|
lastBlock.setNumBytes(newBlock.getNumBytes());
|
||||||
|
List<ReplicaUnderConstruction> staleReplicas = lastBlock
|
||||||
|
.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
|
||||||
|
removeStaleReplicas(staleReplicas, lastBlock);
|
||||||
|
}
|
||||||
|
|
||||||
/** updates a block in needed reconstruction queue. */
|
/** updates a block in needed reconstruction queue. */
|
||||||
private void updateNeededReconstructions(final BlockInfo block,
|
private void updateNeededReconstructions(final BlockInfo block,
|
||||||
final int curReplicasDelta, int expectedReplicasDelta) {
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
||||||
|
@ -4962,8 +4962,7 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update old block with the new generation stamp and new length
|
// Update old block with the new generation stamp and new length
|
||||||
lastBlock.setNumBytes(newBlock.getNumBytes());
|
blockManager.updateLastBlock(lastBlock, newBlock);
|
||||||
lastBlock.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
|
|
||||||
|
|
||||||
// find the DatanodeDescriptor objects
|
// find the DatanodeDescriptor objects
|
||||||
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
|
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
|
||||||
|
@ -80,6 +80,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
@ -107,6 +108,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ReplicationResult;
|
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ReplicationResult;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ErasureCodingResult;
|
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ErasureCodingResult;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSck;
|
import org.apache.hadoop.hdfs.tools.DFSck;
|
||||||
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
||||||
@ -2424,4 +2426,34 @@ public Boolean get() {
|
|||||||
}
|
}
|
||||||
}, 1000, 60000);
|
}, 1000, 60000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testFsckCorruptWhenOneReplicaIsCorrupt()
|
||||||
|
throws Exception {
|
||||||
|
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(2)
|
||||||
|
.build()) {
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
String filePath = "/appendTest";
|
||||||
|
Path fileName = new Path(filePath);
|
||||||
|
DFSTestUtil.createFile(fs, fileName, 512, (short) 2, 0);
|
||||||
|
DFSTestUtil.waitReplication(fs, fileName, (short) 2);
|
||||||
|
Assert.assertTrue("File not created", fs.exists(fileName));
|
||||||
|
cluster.getDataNodes().get(1).shutdown();
|
||||||
|
DFSTestUtil.appendFile(fs, fileName, "appendCorruptBlock");
|
||||||
|
cluster.restartDataNode(1, true);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override public Boolean get() {
|
||||||
|
return (
|
||||||
|
cluster.getNameNode(0).getNamesystem().getCorruptReplicaBlocks()
|
||||||
|
> 0);
|
||||||
|
}
|
||||||
|
}, 100, 5000);
|
||||||
|
|
||||||
|
DFSTestUtil.appendFile(fs, fileName, "appendCorruptBlock");
|
||||||
|
runFsck(cluster.getConfiguration(0), 0, true, "/");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user