HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery blocks. Contributed by Zhe Zhang.
This commit is contained in:
parent
1b764a01fd
commit
de480d6c89
@ -679,6 +679,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HDFS-8651. Make hadoop-hdfs-project Native code -Wall-clean (Alan Burlison
|
||||
via Colin P. McCabe)
|
||||
|
||||
HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery
|
||||
blocks. (Zhe Zhang via jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
@ -172,19 +172,23 @@ public int getCapacity() {
|
||||
public abstract int numNodes();
|
||||
|
||||
/**
|
||||
* Add a {@link DatanodeStorageInfo} location for a block.
|
||||
* Add a {@link DatanodeStorageInfo} location for a block
|
||||
* @param storage The storage to add
|
||||
* @param reportedBlock The block reported from the datanode. This is only
|
||||
* used by erasure coded blocks, this block's id contains
|
||||
* information indicating the index of the block in the
|
||||
* corresponding block group.
|
||||
*/
|
||||
abstract boolean addStorage(DatanodeStorageInfo storage);
|
||||
abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
|
||||
|
||||
/**
|
||||
* Remove {@link DatanodeStorageInfo} location for a block
|
||||
*/
|
||||
abstract boolean removeStorage(DatanodeStorageInfo storage);
|
||||
|
||||
|
||||
/**
|
||||
* Replace the current BlockInfo with the new one in corresponding
|
||||
* DatanodeStorageInfo's linked list
|
||||
* DatanodeStorageInfo's linked list.
|
||||
*/
|
||||
abstract void replaceBlock(BlockInfo newBlock);
|
||||
|
||||
|
@ -45,7 +45,7 @@ protected BlockInfoContiguous(BlockInfo from) {
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean addStorage(DatanodeStorageInfo storage) {
|
||||
boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
|
||||
return ContiguousBlockStorageOp.addStorage(this, storage);
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ public BlockInfoContiguous convertToCompleteBlock() {
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean addStorage(DatanodeStorageInfo storage) {
|
||||
boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
|
||||
return ContiguousBlockStorageOp.addStorage(this, storage);
|
||||
}
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -24,6 +24,7 @@
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
||||
@ -226,7 +227,7 @@ long getBlockPoolUsed() {
|
||||
return blockPoolUsed;
|
||||
}
|
||||
|
||||
public AddBlockResult addBlock(BlockInfo b) {
|
||||
public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
|
||||
// First check whether the block belongs to a different storage
|
||||
// on the same DN.
|
||||
AddBlockResult result = AddBlockResult.ADDED;
|
||||
@ -245,10 +246,18 @@ public AddBlockResult addBlock(BlockInfo b) {
|
||||
}
|
||||
|
||||
// add to the head of the data-node list
|
||||
b.addStorage(this);
|
||||
b.addStorage(this, reportedBlock);
|
||||
insertToList(b);
|
||||
return result;
|
||||
}
|
||||
|
||||
AddBlockResult addBlock(BlockInfo b) {
|
||||
return addBlock(b, b);
|
||||
}
|
||||
|
||||
public void insertToList(BlockInfo b) {
|
||||
blockList = b.listInsert(blockList, this);
|
||||
numBlocks++;
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean removeBlock(BlockInfo b) {
|
||||
|
@ -143,7 +143,6 @@
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
@ -3136,7 +3135,7 @@ void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
|
||||
if (trackBlockCounts) {
|
||||
if (b.isComplete()) {
|
||||
numRemovedComplete++;
|
||||
if (blockManager.checkMinReplication(b)) {
|
||||
if (blockManager.hasMinStorage(b)) {
|
||||
numRemovedSafe++;
|
||||
}
|
||||
}
|
||||
@ -3368,7 +3367,7 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
||||
curBlock = blocks[nrCompleteBlocks];
|
||||
if(!curBlock.isComplete())
|
||||
break;
|
||||
assert blockManager.checkMinReplication(curBlock) :
|
||||
assert blockManager.hasMinStorage(curBlock) :
|
||||
"A COMPLETE block is not minimally replicated in " + src;
|
||||
}
|
||||
|
||||
@ -3404,7 +3403,7 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
||||
|
||||
// If penultimate block doesn't exist then its minReplication is met
|
||||
boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
|
||||
blockManager.checkMinReplication(penultimateBlock);
|
||||
blockManager.hasMinStorage(penultimateBlock);
|
||||
|
||||
switch(lastBlockState) {
|
||||
case COMPLETE:
|
||||
@ -3413,7 +3412,7 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
||||
case COMMITTED:
|
||||
// Close file if committed blocks are minimally replicated
|
||||
if(penultimateBlockMinReplication &&
|
||||
blockManager.checkMinReplication(lastBlock)) {
|
||||
blockManager.hasMinStorage(lastBlock)) {
|
||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
||||
iip.getLatestSnapshotId());
|
||||
NameNode.stateChangeLog.warn("BLOCK*"
|
||||
@ -3705,9 +3704,9 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
||||
trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
|
||||
if (storageInfo != null) {
|
||||
if(copyTruncate) {
|
||||
storageInfo.addBlock(truncatedBlock);
|
||||
storageInfo.addBlock(truncatedBlock, truncatedBlock);
|
||||
} else {
|
||||
storageInfo.addBlock(storedBlock);
|
||||
storageInfo.addBlock(storedBlock, storedBlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3723,8 +3722,9 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
||||
} else {
|
||||
iFile.setLastBlock(storedBlock, trimmedStorageInfos);
|
||||
if (closeFile) {
|
||||
blockManager.markBlockReplicasAsCorrupt(storedBlock,
|
||||
oldGenerationStamp, oldNumBytes, trimmedStorageInfos);
|
||||
blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(),
|
||||
storedBlock, oldGenerationStamp, oldNumBytes,
|
||||
trimmedStorageInfos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -647,7 +647,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
|
||||
.getStorageType()));
|
||||
}
|
||||
if (showReplicaDetails) {
|
||||
LightWeightLinkedSet<Block> blocksExcess =
|
||||
LightWeightLinkedSet<BlockInfo> blocksExcess =
|
||||
bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
|
||||
Collection<DatanodeDescriptor> corruptReplicas =
|
||||
bm.getCorruptReplicas(block.getLocalBlock());
|
||||
|
@ -63,7 +63,7 @@ public void testAddStorage() throws Exception {
|
||||
|
||||
final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
|
||||
|
||||
boolean added = blockInfo.addStorage(storage);
|
||||
boolean added = blockInfo.addStorage(storage, blockInfo);
|
||||
|
||||
Assert.assertTrue(added);
|
||||
Assert.assertEquals(storage, blockInfo.getStorageInfo(0));
|
||||
|
@ -383,7 +383,7 @@ private void fulfillPipeline(BlockInfo blockInfo,
|
||||
for (int i = 1; i < pipeline.length; i++) {
|
||||
DatanodeStorageInfo storage = pipeline[i];
|
||||
bm.addBlock(storage, blockInfo, null);
|
||||
blockInfo.addStorage(storage);
|
||||
blockInfo.addStorage(storage, blockInfo);
|
||||
}
|
||||
}
|
||||
|
||||
@ -393,7 +393,7 @@ private BlockInfo blockOnNodes(long blkId, List<DatanodeDescriptor> nodes) {
|
||||
|
||||
for (DatanodeDescriptor dn : nodes) {
|
||||
for (DatanodeStorageInfo storage : dn.getStorageInfos()) {
|
||||
blockInfo.addStorage(storage);
|
||||
blockInfo.addStorage(storage, blockInfo);
|
||||
}
|
||||
}
|
||||
return blockInfo;
|
||||
|
@ -100,7 +100,7 @@ public void testNodeCount() throws Exception {
|
||||
DatanodeDescriptor nonExcessDN = null;
|
||||
for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
|
||||
final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
|
||||
Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
|
||||
Collection<BlockInfo> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
|
||||
if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
|
||||
nonExcessDN = dn;
|
||||
break;
|
||||
|
@ -34,7 +34,6 @@
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
@ -42,7 +41,6 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestOverReplicatedBlocks {
|
||||
@ -185,7 +183,7 @@ public void testChooseReplicaToDelete() throws Exception {
|
||||
// All replicas for deletion should be scheduled on lastDN.
|
||||
// And should not actually be deleted, because lastDN does not heartbeat.
|
||||
namesystem.readLock();
|
||||
Collection<Block> dnBlocks =
|
||||
Collection<BlockInfo> dnBlocks =
|
||||
namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
|
||||
assertEquals("Replicas on node " + lastDNid + " should have been deleted",
|
||||
SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());
|
||||
|
@ -1250,7 +1250,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
||||
when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
|
||||
when(storage.addBlock(any(BlockInfo.class))).thenReturn
|
||||
(DatanodeStorageInfo.AddBlockResult.ADDED);
|
||||
ucBlock.addStorage(storage);
|
||||
ucBlock.addStorage(storage, ucBlock);
|
||||
|
||||
when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
|
||||
.thenReturn(ucBlock);
|
||||
|
Loading…
Reference in New Issue
Block a user