HDFS-4099. Clean up replication code and add more javadoc.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1400986 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-10-22 18:30:16 +00:00
parent b69605eff5
commit 75cdb5bb49
3 changed files with 34 additions and 44 deletions

View File

@ -408,6 +408,8 @@ Release 2.0.3-alpha - Unreleased
HDFS-4088. Remove "throws QuotaExceededException" from an HDFS-4088. Remove "throws QuotaExceededException" from an
INodeDirectoryWithQuota constructor. (szetszwo) INodeDirectoryWithQuota constructor. (szetszwo)
HDFS-4099. Clean up replication code and add more javadoc. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
@ -49,14 +51,11 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats; import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
@ -2833,28 +2832,32 @@ private void updateNeededReplications(final Block block,
} }
} }
public void checkReplication(Block block, short numExpectedReplicas) { /**
// filter out containingNodes that are marked for decommission. * Check replication of the blocks in the collection.
NumberReplicas number = countNodes(block); * If any block is needed replication, insert it into the replication queue.
if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) { * Otherwise, if the block is more than the expected replication factor,
neededReplications.add(block, * process it as an over replicated block.
number.liveReplicas(), */
number.decommissionedReplicas(), public void checkReplication(BlockCollection bc) {
numExpectedReplicas); final short expected = bc.getBlockReplication();
return; for (Block block : bc.getBlocks()) {
} final NumberReplicas n = countNodes(block);
if (number.liveReplicas() > numExpectedReplicas) { if (isNeededReplication(block, expected, n.liveReplicas())) {
processOverReplicatedBlock(block, numExpectedReplicas, null, null); neededReplications.add(block, n.liveReplicas(),
n.decommissionedReplicas(), expected);
} else if (n.liveReplicas() > expected) {
processOverReplicatedBlock(block, expected, null, null);
}
} }
} }
/* get replication factor of a block */ /**
* @return 0 if the block is not found;
* otherwise, return the replication factor of the block.
*/
private int getReplication(Block block) { private int getReplication(Block block) {
BlockCollection bc = blocksMap.getBlockCollection(block); final BlockCollection bc = blocksMap.getBlockCollection(block);
if (bc == null) { // block does not belong to any file return bc == null? 0: bc.getBlockReplication();
return 0;
}
return bc.getBlockReplication();
} }
@ -2929,12 +2932,12 @@ boolean blockHasEnoughRacks(Block b) {
return enoughRacks; return enoughRacks;
} }
boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) { /**
if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) { * A block needs replication if the number of replicas is less than expected
return false; * or if it does not have enough racks.
} else { */
return true; private boolean isNeededReplication(Block b, int expected, int current) {
} return current < expected || !blockHasEnoughRacks(b);
} }
public long getMissingBlocksCount() { public long getMissingBlocksCount() {

View File

@ -2428,21 +2428,6 @@ private boolean completeFileInternal(String src,
return true; return true;
} }
/**
* Check all blocks of a file. If any blocks are lower than their intended
* replication factor, then insert them into neededReplication and if
* the blocks are more than the intended replication factor then insert
* them into invalidateBlocks.
*/
private void checkReplicationFactor(INodeFile file) {
short numExpectedReplicas = file.getBlockReplication();
Block[] pendingBlocks = file.getBlocks();
int nrBlocks = pendingBlocks.length;
for (int i = 0; i < nrBlocks; i++) {
blockManager.checkReplication(pendingBlocks[i], numExpectedReplicas);
}
}
/** /**
* Allocate a block at the given pending filename * Allocate a block at the given pending filename
* *
@ -3175,7 +3160,7 @@ private void finalizeINodeFileUnderConstruction(String src,
// close file and persist block allocations for this file // close file and persist block allocations for this file
dir.closeFile(src, newFile); dir.closeFile(src, newFile);
checkReplicationFactor(newFile); blockManager.checkReplication(newFile);
} }
void commitBlockSynchronization(ExtendedBlock lastblock, void commitBlockSynchronization(ExtendedBlock lastblock,