HDFS-14699. Erasure Coding: Storage not considered in live replica when replication streams hard limit reached to threshold. Contributed by Zhao Yi Ming.

This commit is contained in:
Surendra Singh Lilhore 2019-09-12 19:11:50 +05:30
parent f4f9f0fe4f
commit d1c303a497
2 changed files with 90 additions and 8 deletions

View File

@ -2351,6 +2351,22 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
continue; // already reached replication limit continue; // already reached replication limit
} }
// for EC here need to make sure the numReplicas replicates state correct
// because in the scheduleReconstruction it need the numReplicas to check
// whether need to reconstruct the ec internal block
byte blockIndex = -1;
if (isStriped) {
blockIndex = ((BlockInfoStriped) block)
.getStorageBlockIndex(storage);
if (!bitSet.get(blockIndex)) {
bitSet.set(blockIndex);
} else if (state == StoredReplicaState.LIVE) {
numReplicas.subtract(StoredReplicaState.LIVE, 1);
numReplicas.add(StoredReplicaState.REDUNDANT, 1);
}
}
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) { if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) {
continue; continue;
} }
@ -2358,15 +2374,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
if(isStriped || srcNodes.isEmpty()) { if(isStriped || srcNodes.isEmpty()) {
srcNodes.add(node); srcNodes.add(node);
if (isStriped) { if (isStriped) {
byte blockIndex = ((BlockInfoStriped) block).
getStorageBlockIndex(storage);
liveBlockIndices.add(blockIndex); liveBlockIndices.add(blockIndex);
if (!bitSet.get(blockIndex)) {
bitSet.set(blockIndex);
} else if (state == StoredReplicaState.LIVE) {
numReplicas.subtract(StoredReplicaState.LIVE, 1);
numReplicas.add(StoredReplicaState.REDUNDANT, 1);
}
} }
continue; continue;
} }

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -67,6 +68,7 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
@ -685,6 +687,67 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length); LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length);
} }
@Test
public void testChooseSrcDatanodesWithDupEC() throws Exception {
bm.maxReplicationStreams = 4;
long blockId = -9223372036854775776L; // real ec block id
Block aBlock = new Block(blockId, 0, 0);
// ec policy
ECSchema rsSchema = new ECSchema("rs", 3, 2);
String policyName = "RS-3-2-128k";
int cellSize = 128 * 1024;
ErasureCodingPolicy ecPolicy =
new ErasureCodingPolicy(policyName, rsSchema, cellSize, (byte) -1);
// striped blockInfo
BlockInfoStriped aBlockInfoStriped = new BlockInfoStriped(aBlock, ecPolicy);
// ec storageInfo
DatanodeStorageInfo ds1 = DFSTestUtil.createDatanodeStorageInfo(
"storage1", "1.1.1.1", "rack1", "host1");
DatanodeStorageInfo ds2 = DFSTestUtil.createDatanodeStorageInfo(
"storage2", "2.2.2.2", "rack2", "host2");
DatanodeStorageInfo ds3 = DFSTestUtil.createDatanodeStorageInfo(
"storage3", "3.3.3.3", "rack3", "host3");
DatanodeStorageInfo ds4 = DFSTestUtil.createDatanodeStorageInfo(
"storage4", "4.4.4.4", "rack4", "host4");
DatanodeStorageInfo ds5 = DFSTestUtil.createDatanodeStorageInfo(
"storage5", "5.5.5.5", "rack5", "host5");
// link block with storage
aBlockInfoStriped.addStorage(ds1, aBlock);
aBlockInfoStriped.addStorage(ds2, new Block(blockId + 1, 0, 0));
aBlockInfoStriped.addStorage(ds3, new Block(blockId + 2, 0, 0));
// dup internal block
aBlockInfoStriped.addStorage(ds4, new Block(blockId + 3, 0, 0));
aBlockInfoStriped.addStorage(ds5, new Block(blockId + 3, 0, 0));
// simulate the node 2 arrive maxReplicationStreams
for(int i = 0; i < 4; i++){
ds4.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}
addEcBlockToBM(blockId, ecPolicy);
List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
List<DatanodeStorageInfo> liveNodes = new LinkedList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
List<Byte> liveBlockIndices = new ArrayList<>();
bm.chooseSourceDatanodes(
aBlockInfoStriped,
cntNodes,
liveNodes,
numReplicas, liveBlockIndices,
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
assertEquals("Choose the source node for reconstruction with one node reach"
+ " the MAX maxReplicationStreams, the numReplicas still return the"
+ " correct live replicas.", 4,
numReplicas.liveReplicas());
assertEquals("Choose the source node for reconstruction with one node reach"
+ " the MAX maxReplicationStreams, the numReplicas should return"
+ " the correct redundant Internal Blocks.", 1,
numReplicas.redundantInternalBlocks());
}
@Test @Test
public void testFavorDecomUntilHardLimit() throws Exception { public void testFavorDecomUntilHardLimit() throws Exception {
bm.maxReplicationStreams = 0; bm.maxReplicationStreams = 0;
@ -979,6 +1042,17 @@ public void testUCBlockNotConsideredMissing() throws Exception {
bm.setInitializedReplQueues(false); bm.setInitializedReplQueues(false);
} }
private BlockInfo addEcBlockToBM(long blkId, ErasureCodingPolicy ecPolicy) {
Block block = new Block(blkId);
BlockInfo blockInfo = new BlockInfoStriped(block, ecPolicy);
long inodeId = ++mockINodeId;
final INodeFile bc = TestINodeFile.createINodeFile(inodeId);
bm.blocksMap.addBlockCollection(blockInfo, bc);
blockInfo.setBlockCollectionId(inodeId);
doReturn(bc).when(fsn).getBlockCollection(inodeId);
return blockInfo;
}
private BlockInfo addBlockToBM(long blkId) { private BlockInfo addBlockToBM(long blkId) {
Block block = new Block(blkId); Block block = new Block(blkId);
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3); BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);