HDFS-16456. EC: Decommission a rack with only on dn will fail when the rack number is equal with replication (#4126)

This commit is contained in:
caozhiqiang 2022-04-14 17:42:39 +08:00 committed by GitHub
parent c65c383b7e
commit cee8c62498
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 297 additions and 41 deletions

View File

@ -101,6 +101,13 @@ protected NetworkTopology init(InnerNode.Factory factory) {
private int depthOfAllLeaves = -1; private int depthOfAllLeaves = -1;
/** rack counter */ /** rack counter */
protected int numOfRacks = 0; protected int numOfRacks = 0;
/** empty rack map, rackname->nodenumber. */
private HashMap<String, Set<String>> rackMap =
new HashMap<String, Set<String>>();
/** decommission nodes, contained stoped nodes. */
private HashSet<String> decommissionNodes = new HashSet<>();
/** empty rack counter. */
private int numOfEmptyRacks = 0;
/** /**
* Whether or not this cluster has ever consisted of more than 1 rack, * Whether or not this cluster has ever consisted of more than 1 rack,
@ -150,6 +157,7 @@ public void add(Node node) {
if (rack == null) { if (rack == null) {
incrementRacks(); incrementRacks();
} }
interAddNodeWithEmptyRack(node);
if (depthOfAllLeaves == -1) { if (depthOfAllLeaves == -1) {
depthOfAllLeaves = node.getLevel(); depthOfAllLeaves = node.getLevel();
} }
@ -224,6 +232,7 @@ public void remove(Node node) {
if (rack == null) { if (rack == null) {
numOfRacks--; numOfRacks--;
} }
interRemoveNodeWithEmptyRack(node);
} }
LOG.debug("NetworkTopology became:\n{}", this); LOG.debug("NetworkTopology became:\n{}", this);
} finally { } finally {
@ -1015,4 +1024,108 @@ protected static boolean isNodeInScope(Node node, String scope) {
String nodeLocation = NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR; String nodeLocation = NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR;
return nodeLocation.startsWith(scope); return nodeLocation.startsWith(scope);
} }
/** @return the number of nonempty racks */
public int getNumOfNonEmptyRacks() {
return numOfRacks - numOfEmptyRacks;
}
/**
* Update empty rack number when add a node like recommission.
* @param node node to be added; can be null
*/
public void recommissionNode(Node node) {
if (node == null) {
return;
}
if (node instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allow to remove an inner node: " + NodeBase.getPath(node));
}
netlock.writeLock().lock();
try {
decommissionNodes.remove(node.getName());
interAddNodeWithEmptyRack(node);
} finally {
netlock.writeLock().unlock();
}
}
/**
* Update empty rack number when remove a node like decommission.
* @param node node to be added; can be null
*/
public void decommissionNode(Node node) {
if (node == null) {
return;
}
if (node instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allow to remove an inner node: " + NodeBase.getPath(node));
}
netlock.writeLock().lock();
try {
decommissionNodes.add(node.getName());
interRemoveNodeWithEmptyRack(node);
} finally {
netlock.writeLock().unlock();
}
}
/**
* Internal function for update empty rack number
* for add or recommission a node.
* @param node node to be added; can be null
*/
private void interAddNodeWithEmptyRack(Node node) {
if (node == null) {
return;
}
String rackname = node.getNetworkLocation();
Set<String> nodes = rackMap.get(rackname);
if (nodes == null) {
nodes = new HashSet<String>();
}
if (!decommissionNodes.contains(node.getName())) {
nodes.add(node.getName());
}
rackMap.put(rackname, nodes);
countEmptyRacks();
}
/**
* Internal function for update empty rack number
* for remove or decommission a node.
* @param node node to be removed; can be null
*/
private void interRemoveNodeWithEmptyRack(Node node) {
if (node == null) {
return;
}
String rackname = node.getNetworkLocation();
Set<String> nodes = rackMap.get(rackname);
if (nodes != null) {
InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
if (rack == null) {
// this node and its rack are both removed.
rackMap.remove(rackname);
} else if (nodes.contains(node.getName())) {
// this node is decommissioned or removed.
nodes.remove(node.getName());
rackMap.put(rackname, nodes);
}
countEmptyRacks();
}
}
private void countEmptyRacks() {
int count = 0;
for (Set<String> nodes : rackMap.values()) {
if (nodes != null && nodes.isEmpty()) {
count++;
}
}
numOfEmptyRacks = count;
LOG.debug("Current numOfEmptyRacks is {}", numOfEmptyRacks);
}
} }

View File

@ -304,7 +304,7 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
&& stats.isAvoidingStaleDataNodesForWrite()); && stats.isAvoidingStaleDataNodesForWrite());
boolean avoidLocalRack = (addBlockFlags != null boolean avoidLocalRack = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_RACK) && writer != null && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_RACK) && writer != null
&& clusterMap.getNumOfRacks() > 2); && clusterMap.getNumOfNonEmptyRacks() > 2);
boolean avoidLocalNode = (addBlockFlags != null boolean avoidLocalNode = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE) && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
&& writer != null && writer != null
@ -385,7 +385,7 @@ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
totalNumOfReplicas = clusterSize; totalNumOfReplicas = clusterSize;
} }
// No calculation needed when there is only one rack or picking one node. // No calculation needed when there is only one rack or picking one node.
int numOfRacks = clusterMap.getNumOfRacks(); int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
// HDFS-14527 return default when numOfRacks = 0 to avoid // HDFS-14527 return default when numOfRacks = 0 to avoid
// ArithmeticException when calc maxNodesPerRack at following logic. // ArithmeticException when calc maxNodesPerRack at following logic.
if (numOfRacks <= 1 || totalNumOfReplicas <= 1) { if (numOfRacks <= 1 || totalNumOfReplicas <= 1) {
@ -1173,7 +1173,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
.map(dn -> dn.getNetworkLocation()).distinct().count(); .map(dn -> dn.getNetworkLocation()).distinct().count();
return new BlockPlacementStatusDefault(Math.toIntExact(rackCount), return new BlockPlacementStatusDefault(Math.toIntExact(rackCount),
minRacks, clusterMap.getNumOfRacks()); minRacks, clusterMap.getNumOfNonEmptyRacks());
} }
/** /**
@ -1370,4 +1370,3 @@ public boolean getExcludeSlowNodesEnabled() {
return excludeSlowNodesEnabled; return excludeSlowNodesEnabled;
} }
} }

View File

@ -42,7 +42,7 @@ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
totalNumOfReplicas = clusterSize; totalNumOfReplicas = clusterSize;
} }
// No calculation needed when there is only one rack or picking one node. // No calculation needed when there is only one rack or picking one node.
int numOfRacks = clusterMap.getNumOfRacks(); int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
// HDFS-14527 return default when numOfRacks = 0 to avoid // HDFS-14527 return default when numOfRacks = 0 to avoid
// ArithmeticException when calc maxNodesPerRack at following logic. // ArithmeticException when calc maxNodesPerRack at following logic.
if (numOfRacks <= 1 || totalNumOfReplicas <= 1) { if (numOfRacks <= 1 || totalNumOfReplicas <= 1) {
@ -90,7 +90,9 @@ protected Node chooseTargetInOrder(int numOfReplicas,
EnumMap<StorageType, Integer> storageTypes) EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
int totalReplicaExpected = results.size() + numOfReplicas; int totalReplicaExpected = results.size() + numOfReplicas;
int numOfRacks = clusterMap.getNumOfRacks(); int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
try {
if (totalReplicaExpected < numOfRacks || if (totalReplicaExpected < numOfRacks ||
totalReplicaExpected % numOfRacks == 0) { totalReplicaExpected % numOfRacks == 0) {
writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
@ -121,7 +123,6 @@ protected Node chooseTargetInOrder(int numOfReplicas,
numOfReplicas = Math.min(totalReplicaExpected - results.size(), numOfReplicas = Math.min(totalReplicaExpected - results.size(),
(maxNodesPerRack -1) * numOfRacks - (results.size() - excess)); (maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
try {
// Try to spread the replicas as evenly as possible across racks. // Try to spread the replicas as evenly as possible across racks.
// This is done by first placing with (maxNodesPerRack-1), then spreading // This is done by first placing with (maxNodesPerRack-1), then spreading
// the remainder by calling again with maxNodesPerRack. // the remainder by calling again with maxNodesPerRack.
@ -243,7 +244,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
racks.add(dn.getNetworkLocation()); racks.add(dn.getNetworkLocation());
} }
return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas, return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas,
clusterMap.getNumOfRacks()); clusterMap.getNumOfNonEmptyRacks());
} }
@Override @Override

View File

@ -177,6 +177,8 @@ public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager // Update DN stats maintained by HeartbeatManager
hbManager.startDecommission(node); hbManager.startDecommission(node);
// Update cluster's emptyRack
blockManager.getDatanodeManager().getNetworkTopology().decommissionNode(node);
// hbManager.startDecommission will set dead node to decommissioned. // hbManager.startDecommission will set dead node to decommissioned.
if (node.isDecommissionInProgress()) { if (node.isDecommissionInProgress()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) { for (DatanodeStorageInfo storage : node.getStorageInfos()) {
@ -201,6 +203,8 @@ public void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) { if (node.isDecommissionInProgress() || node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager // Update DN stats maintained by HeartbeatManager
hbManager.stopDecommission(node); hbManager.stopDecommission(node);
// Update cluster's emptyRack
blockManager.getDatanodeManager().getNetworkTopology().recommissionNode(node);
// extra redundancy blocks will be detected and processed when // extra redundancy blocks will be detected and processed when
// the dead node comes back and send in its full block report. // the dead node comes back and send in its full block report.
if (node.isAlive()) { if (node.isAlive()) {

View File

@ -8405,7 +8405,7 @@ public ECTopologyVerifierResult getECTopologyResultForPolicies(
getBlockManager().getDatanodeManager().getNumOfDataNodes(); getBlockManager().getDatanodeManager().getNumOfDataNodes();
int numOfRacks = int numOfRacks =
getBlockManager().getDatanodeManager().getNetworkTopology() getBlockManager().getDatanodeManager().getNetworkTopology()
.getNumOfRacks(); .getNumOfNonEmptyRacks();
result = ECTopologyVerifier result = ECTopologyVerifier
.getECTopologyVerifierResult(numOfRacks, numOfDataNodes, policies); .getECTopologyVerifierResult(numOfRacks, numOfDataNodes, policies);
} }
@ -8950,7 +8950,7 @@ private ECTopologyVerifierResult getEcTopologyVerifierResultForEnabledPolicies()
int numOfDataNodes = int numOfDataNodes =
getBlockManager().getDatanodeManager().getNumOfDataNodes(); getBlockManager().getDatanodeManager().getNumOfDataNodes();
int numOfRacks = getBlockManager().getDatanodeManager().getNetworkTopology() int numOfRacks = getBlockManager().getDatanodeManager().getNetworkTopology()
.getNumOfRacks(); .getNumOfNonEmptyRacks();
ErasureCodingPolicy[] enabledEcPolicies = ErasureCodingPolicy[] enabledEcPolicies =
getErasureCodingPolicyManager().getCopyOfEnabledPolicies(); getErasureCodingPolicyManager().getCopyOfEnabledPolicies();
return ECTopologyVerifier return ECTopologyVerifier
@ -9012,4 +9012,3 @@ public void checkErasureCodingSupported(String operationName)
} }
} }
} }

View File

@ -19,24 +19,35 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -167,6 +178,108 @@ private void doTestChooseTargetSpecialCase() throws Exception {
} }
} }
/**
* Verify decommission a dn which is an only node in its rack.
*/
@Test
public void testPlacementWithOnlyOneNodeInRackDecommission() throws Exception {
Configuration conf = new HdfsConfiguration();
final String[] racks = {"/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK4", "/RACK5", "/RACK2"};
final String[] hosts = {"/host0", "/host1", "/host2", "/host3", "/host4", "/host5", "/host6"};
// enables DFSNetworkTopology
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyRackFaultTolerant.class,
BlockPlacementPolicy.class);
conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
DEFAULT_BLOCK_SIZE / 2);
if (cluster != null) {
cluster.shutdown();
}
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(7).racks(racks)
.hosts(hosts).build();
cluster.waitActive();
nameNodeRpc = cluster.getNameNodeRpc();
namesystem = cluster.getNamesystem();
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy("RS-3-2-1024k");
fs.setErasureCodingPolicy(new Path("/"), "RS-3-2-1024k");
final BlockManager bm = cluster.getNamesystem().getBlockManager();
final DatanodeManager dm = bm.getDatanodeManager();
assertTrue(dm.getNetworkTopology() instanceof DFSNetworkTopology);
String clientMachine = "/host4";
String clientRack = "/RACK4";
String src = "/test";
final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
DatanodeDescriptor dnd4 = dnm.getDatanode(cluster.getDataNodes().get(4).getDatanodeId());
assertEquals(dnd4.getNetworkLocation(), clientRack);
dnm.getDatanodeAdminManager().startDecommission(dnd4);
short replication = 5;
short additionalReplication = 1;
try {
// Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
replication, DEFAULT_BLOCK_SIZE * 1024 * 10, null, null, null, false);
//test chooseTarget for new file
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null, null);
HashMap<String, Integer> racksCount = new HashMap<String, Integer>();
doTestLocatedBlockRacks(racksCount, replication, 4, locatedBlock);
//test chooseTarget for existing file.
LocatedBlock additionalLocatedBlock =
nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
locatedBlock.getBlock(), locatedBlock.getLocations(),
locatedBlock.getStorageIDs(), DatanodeInfo.EMPTY_ARRAY,
additionalReplication, clientMachine);
racksCount.clear();
doTestLocatedBlockRacks(racksCount, additionalReplication + replication,
4, additionalLocatedBlock);
assertEquals(racksCount.get("/RACK0"), (Integer)2);
assertEquals(racksCount.get("/RACK2"), (Integer)2);
} finally {
dnm.getDatanodeAdminManager().stopDecommission(dnd4);
}
//test if decommission succeeded
DatanodeDescriptor dnd3 = dnm.getDatanode(cluster.getDataNodes().get(3).getDatanodeId());
cluster.getNamesystem().writeLock();
try {
dm.getDatanodeAdminManager().startDecommission(dnd3);
} finally {
cluster.getNamesystem().writeUnlock();
}
// make sure the decommission finishes and the block in on 4 racks
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return dnd3.isDecommissioned();
}
}, 1000, 10 * 1000);
LocatedBlocks locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks(
src, 0, DEFAULT_BLOCK_SIZE);
assertEquals(4, bm.getDatanodeManager().
getNetworkTopology().getNumOfNonEmptyRacks());
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
BlockPlacementStatus status = bm.getStriptedBlockPlacementPolicy()
.verifyBlockPlacement(block.getLocations(), 5);
Assert.assertTrue(status.isPlacementPolicySatisfied());
}
}
private void shuffle(DatanodeInfo[] locs, String[] storageIDs) { private void shuffle(DatanodeInfo[] locs, String[] storageIDs) {
int length = locs.length; int length = locs.length;
Object[][] pairs = new Object[length][]; Object[][] pairs = new Object[length][];
@ -198,6 +311,17 @@ private void doTestLocatedBlock(int replication, LocatedBlock locatedBlock) {
assertTrue(maxCount - minCount <= 1); assertTrue(maxCount - minCount <= 1);
} }
private void doTestLocatedBlockRacks(HashMap<String, Integer> racksCount, int replication,
int validracknum, LocatedBlock locatedBlock) {
assertEquals(replication, locatedBlock.getLocations().length);
for (DatanodeInfo node :
locatedBlock.getLocations()) {
addToRacksCount(node.getNetworkLocation(), racksCount);
}
assertEquals(validracknum, racksCount.size());
}
private void addToRacksCount(String rack, HashMap<String, Integer> racksCount) { private void addToRacksCount(String rack, HashMap<String, Integer> racksCount) {
Integer count = racksCount.get(rack); Integer count = racksCount.get(rack);
if (count == null) { if (count == null) {

View File

@ -633,4 +633,20 @@ public void testCountNumOfAvailableNodes() {
numNodes = cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes); numNodes = cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes);
assertEquals(12, numNodes); assertEquals(12, numNodes);
} }
@Test
public void testAddAndRemoveNodeWithEmptyRack() {
DatanodeDescriptor n1 = DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3");
DatanodeDescriptor n2 = DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3");
DatanodeDescriptor n3 = DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3");
cluster.decommissionNode(n1);
assertEquals(6, cluster.getNumOfNonEmptyRacks());
cluster.decommissionNode(n2);
cluster.decommissionNode(n3);
assertEquals(5, cluster.getNumOfNonEmptyRacks());
cluster.recommissionNode(n1);
assertEquals(6, cluster.getNumOfNonEmptyRacks());
}
} }