HDFS-17052. Improve BlockPlacementPolicyRackFaultTolerant to avoid choose nodes failed when no enough Rack. (#5759). Contributed by Hualong Zhang and Shuyan Zhang.
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
59a7836d13
commit
750c0fc631
@ -170,7 +170,7 @@ private void chooseEvenlyFromRemainingRacks(Node writer,
|
||||
NotEnoughReplicasException lastException = e;
|
||||
int bestEffortMaxNodesPerRack = maxNodesPerRack;
|
||||
while (results.size() != totalReplicaExpected &&
|
||||
numResultsOflastChoose != results.size()) {
|
||||
bestEffortMaxNodesPerRack < totalReplicaExpected) {
|
||||
// Exclude the chosen nodes
|
||||
final Set<Node> newExcludeNodes = new HashSet<>();
|
||||
for (DatanodeStorageInfo resultStorage : results) {
|
||||
@ -192,11 +192,22 @@ private void chooseEvenlyFromRemainingRacks(Node writer,
|
||||
} finally {
|
||||
excludedNodes.addAll(newExcludeNodes);
|
||||
}
|
||||
// To improve performance, the maximum value of 'bestEffortMaxNodesPerRack'
|
||||
// is calculated only when it is not possible to select a node.
|
||||
if (numResultsOflastChoose == results.size()) {
|
||||
Map<String, Integer> nodesPerRack = new HashMap<>();
|
||||
for (DatanodeStorageInfo dsInfo : results) {
|
||||
String rackName = dsInfo.getDatanodeDescriptor().getNetworkLocation();
|
||||
nodesPerRack.merge(rackName, 1, Integer::sum);
|
||||
}
|
||||
bestEffortMaxNodesPerRack =
|
||||
Math.max(bestEffortMaxNodesPerRack, Collections.max(nodesPerRack.values()));
|
||||
}
|
||||
}
|
||||
|
||||
if (numResultsOflastChoose != totalReplicaExpected) {
|
||||
if (results.size() != totalReplicaExpected) {
|
||||
LOG.debug("Best effort placement failed: expecting {} replicas, only "
|
||||
+ "chose {}.", totalReplicaExpected, numResultsOflastChoose);
|
||||
+ "chose {}.", totalReplicaExpected, results.size());
|
||||
throw lastException;
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
@ -52,6 +53,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.BitSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -515,4 +517,63 @@ public void testReconstrutionWithBusyBlock1() throws Exception {
|
||||
assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReconstructionWithStorageTypeNotEnough() throws Exception {
|
||||
final HdfsConfiguration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
|
||||
|
||||
// Nine disk node eleven archive node.
|
||||
int numDn = groupSize * 2 + 2;
|
||||
StorageType[][] storageTypes = new StorageType[numDn][];
|
||||
Arrays.fill(storageTypes, 0, groupSize,
|
||||
new StorageType[]{StorageType.DISK, StorageType.DISK});
|
||||
Arrays.fill(storageTypes, groupSize, numDn,
|
||||
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE});
|
||||
|
||||
// Nine disk racks and one archive rack.
|
||||
String[] racks = {
|
||||
"/rack1", "/rack2", "/rack3", "/rack4", "/rack5", "/rack6", "/rack7", "/rack8",
|
||||
"/rack9", "/rack0", "/rack0", "/rack0", "/rack0", "/rack0", "/rack0", "/rack0",
|
||||
"/rack0", "/rack0", "/rack0", "/rack0"};
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDn)
|
||||
.storageTypes(storageTypes)
|
||||
.racks(racks)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
fs.enableErasureCodingPolicy(
|
||||
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||
|
||||
try {
|
||||
fs.mkdirs(dirPath);
|
||||
fs.setStoragePolicy(dirPath, "COLD");
|
||||
fs.setErasureCodingPolicy(dirPath,
|
||||
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||
DFSTestUtil.createFile(fs, filePath,
|
||||
cellSize * dataBlocks * 2, (short) 1, 0L);
|
||||
|
||||
// Stop one dn.
|
||||
LocatedBlocks blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
|
||||
LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
|
||||
DatanodeInfo dnToStop = block.getLocations()[0];
|
||||
cluster.stopDataNode(dnToStop.getXferAddr());
|
||||
cluster.setDataNodeDead(dnToStop);
|
||||
|
||||
// Wait for reconstruction to happen.
|
||||
StripedFileTestUtil.waitForReconstructionFinished(filePath, fs, groupSize);
|
||||
blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
|
||||
block = (LocatedStripedBlock) blks.getLastLocatedBlock();
|
||||
BitSet bitSet = new BitSet(groupSize);
|
||||
for (byte index : block.getBlockIndices()) {
|
||||
bitSet.set(index);
|
||||
}
|
||||
for (int i = 0; i < groupSize; i++) {
|
||||
Assert.assertTrue(bitSet.get(i));
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user