HDFS-7864. Erasure Coding: Update safemode calculation for striped blocks. Contributed by GAO Rui.
This commit is contained in:
parent
544f75d651
commit
46dac3595f
@ -234,6 +234,12 @@ public static boolean isStripedBlockID(long id) {
|
||||
return id < 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* The last 4 bits of HdfsConstants.BLOCK_GROUP_INDEX_MASK(15) is 1111,
|
||||
* so the last 4 bits of (~HdfsConstants.BLOCK_GROUP_INDEX_MASK) is 0000
|
||||
* and the other 60 bits are 1. Group ID is the first 60 bits of any
|
||||
* data/parity block id in the same striped block group.
|
||||
*/
|
||||
public static long convertToStripedID(long id) {
|
||||
return id & (~HdfsConstants.BLOCK_GROUP_INDEX_MASK);
|
||||
}
|
||||
|
@ -687,8 +687,10 @@ private BlockInfo completeBlock(final BlockCollection bc,
|
||||
// a "forced" completion when a file is getting closed by an
|
||||
// OP_CLOSE edit on the standby).
|
||||
namesystem.adjustSafeModeBlockTotals(0, 1);
|
||||
final int minStorage = curBlock.isStriped() ?
|
||||
((BlockInfoStriped) curBlock).getDataBlockNum() : minReplication;
|
||||
namesystem.incrementSafeBlockCount(
|
||||
Math.min(numNodes, minReplication));
|
||||
Math.min(numNodes, minStorage), curBlock);
|
||||
|
||||
// replace block in the blocksMap
|
||||
return blocksMap.replaceBlock(completeBlock);
|
||||
@ -2234,7 +2236,7 @@ private void processFirstBlockReport(
|
||||
// refer HDFS-5283
|
||||
if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) {
|
||||
int numOfReplicas = BlockInfo.getNumExpectedLocations(storedBlock);
|
||||
namesystem.incrementSafeBlockCount(numOfReplicas);
|
||||
namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock);
|
||||
}
|
||||
//and fall through to next clause
|
||||
}
|
||||
@ -2622,14 +2624,14 @@ && checkMinStorage(storedBlock, numCurrentReplica)) {
|
||||
// only complete blocks are counted towards that.
|
||||
// In the case that the block just became complete above, completeBlock()
|
||||
// handles the safe block count maintenance.
|
||||
namesystem.incrementSafeBlockCount(numCurrentReplica);
|
||||
namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Modify (block-->datanode) map. Remove block from set of
|
||||
* needed replications if this takes care of the problem.
|
||||
* @return the block that is stored in blockMap.
|
||||
* @return the block that is stored in blocksMap.
|
||||
*/
|
||||
private Block addStoredBlock(final BlockInfo block,
|
||||
final Block reportedBlock,
|
||||
@ -2698,7 +2700,7 @@ private Block addStoredBlock(final BlockInfo block,
|
||||
// Is no-op if not in safe mode.
|
||||
// In the case that the block just became complete above, completeBlock()
|
||||
// handles the safe block count maintenance.
|
||||
namesystem.incrementSafeBlockCount(numCurrentReplica);
|
||||
namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
|
||||
}
|
||||
|
||||
// if file is under construction, then done for now
|
||||
|
@ -140,7 +140,7 @@ void removeBlock(Block block) {
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns the block object it it exists in the map. */
|
||||
/** Returns the block object if it exists in the map. */
|
||||
BlockInfo getStoredBlock(Block b) {
|
||||
return blocks.get(b);
|
||||
}
|
||||
|
@ -214,6 +214,7 @@
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
||||
@ -4822,10 +4823,16 @@ private synchronized void setBlockTotal(int total) {
|
||||
/**
|
||||
* Increment number of safe blocks if current block has
|
||||
* reached minimal replication.
|
||||
* @param replication current replication
|
||||
* @param storageNum current number of replicas or number of internal blocks
|
||||
* of a striped block group
|
||||
* @param storedBlock current storedBlock which is either a
|
||||
* BlockInfoContiguous or a BlockInfoStriped
|
||||
*/
|
||||
private synchronized void incrementSafeBlockCount(short replication) {
|
||||
if (replication == safeReplication) {
|
||||
private synchronized void incrementSafeBlockCount(short storageNum,
|
||||
BlockInfo storedBlock) {
|
||||
final int safe = storedBlock.isStriped() ?
|
||||
((BlockInfoStriped) storedBlock).getDataBlockNum() : safeReplication;
|
||||
if (storageNum == safe) {
|
||||
this.blockSafe++;
|
||||
|
||||
// Report startup progress only if we haven't completed startup yet.
|
||||
@ -5118,12 +5125,12 @@ private boolean shouldPopulateReplQueues() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementSafeBlockCount(int replication) {
|
||||
public void incrementSafeBlockCount(int storageNum, BlockInfo storedBlock) {
|
||||
// safeMode is volatile, and may be set to null at any time
|
||||
SafeModeInfo safeMode = this.safeMode;
|
||||
if (safeMode == null)
|
||||
return;
|
||||
safeMode.incrementSafeBlockCount((short)replication);
|
||||
safeMode.incrementSafeBlockCount((short) storageNum, storedBlock);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -45,9 +45,10 @@ public interface SafeMode {
|
||||
|
||||
/**
|
||||
* Increment number of blocks that reached minimal replication.
|
||||
* @param replication current replication
|
||||
* @param replication current replication
|
||||
* @param storedBlock current stored Block
|
||||
*/
|
||||
public void incrementSafeBlockCount(int replication);
|
||||
public void incrementSafeBlockCount(int replication, BlockInfo storedBlock);
|
||||
|
||||
/** Decrement number of blocks that reached minimal replication. */
|
||||
public void decrementSafeBlockCount(BlockInfo b);
|
||||
|
@ -552,7 +552,18 @@ public void testSafeModeWhenZeroBlockLocations() throws IOException {
|
||||
if(cluster!= null) cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//TODO : test should be added to check safeMode with stripedBloks after stripedBlock related functions have been added in class MiniDFSCluster
|
||||
@Test
|
||||
public void testSafeModeWithCorruptSripedBlock() throws IOException {
|
||||
try {
|
||||
|
||||
} finally {
|
||||
if(fs != null) fs.close();
|
||||
if(cluster!= null) cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
void checkGetBlockLocationsWorks(FileSystem fs, Path fileName) throws IOException {
|
||||
FileStatus stat = fs.getFileStatus(fileName);
|
||||
try {
|
||||
@ -560,7 +571,7 @@ void checkGetBlockLocationsWorks(FileSystem fs, Path fileName) throws IOExceptio
|
||||
} catch (SafeModeException e) {
|
||||
assertTrue("Should have not got safemode exception", false);
|
||||
} catch (RemoteException re) {
|
||||
assertTrue("Should have not got safemode exception", false);
|
||||
assertTrue("Should have not got remote exception", false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user