HDFS-17091. Blocks on DECOMMISSIONING DNs should be sorted properly in LocatedBlocks. (#5849)

This commit is contained in:
WangYuanben 2023-07-19 09:51:49 +08:00 committed by GitHub
parent b3130056f5
commit 0fe1863094
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 128 additions and 59 deletions

View File

@ -139,13 +139,20 @@ public static SecureRandom getSecureRandom() {
public static class ServiceComparator implements Comparator<DatanodeInfo> {
@Override
public int compare(DatanodeInfo a, DatanodeInfo b) {
// Decommissioned nodes will still be moved to the end of the list
// Decommissioned nodes will be moved to the end of the list.
if (a.isDecommissioned()) {
return b.isDecommissioned() ? 0 : 1;
} else if (b.isDecommissioned()) {
return -1;
}
// Decommissioning nodes will be placed before decommissioned nodes.
if (a.isDecommissionInProgress()) {
return b.isDecommissionInProgress() ? 0 : 1;
} else if (b.isDecommissionInProgress()) {
return -1;
}
// ENTERING_MAINTENANCE nodes should be after live nodes.
if (a.isEnteringMaintenance()) {
return b.isEnteringMaintenance() ? 0 : 1;
@ -159,9 +166,9 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {
/**
* Comparator for sorting DataNodeInfo[] based on
* slow, stale, entering_maintenance and decommissioned states.
* slow, stale, entering_maintenance, decommissioning and decommissioned states.
* Order: live {@literal ->} slow {@literal ->} stale {@literal ->}
* entering_maintenance {@literal ->} decommissioned
* entering_maintenance {@literal ->} decommissioning {@literal ->} decommissioned
*/
@InterfaceAudience.Private
public static class StaleAndSlowComparator extends ServiceComparator {

View File

@ -511,7 +511,9 @@ public DatanodeStatistics getDatanodeStatistics() {
}
private boolean isInactive(DatanodeInfo datanode) {
return datanode.isDecommissioned() || datanode.isEnteringMaintenance() ||
return datanode.isDecommissioned() ||
datanode.isDecommissionInProgress() ||
datanode.isEnteringMaintenance() ||
(avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
}
@ -540,7 +542,7 @@ public int getMaxSlowpeerCollectNodes() {
/**
* Sort the non-striped located blocks by the distance to the target host.
*
* For striped blocks, it will only move decommissioned/stale/slow
* For striped blocks, it will only move decommissioned/decommissioning/stale/slow
* nodes to the bottom. For example, assume we have storage list:
* d0, d1, d2, d3, d4, d5, d6, d7, d8, d9
* mapping to block indices:
@ -570,7 +572,7 @@ public void sortLocatedBlocks(final String targetHost,
}
/**
* Move decommissioned/entering_maintenance/stale/slow
* Move decommissioned/decommissioning/entering_maintenance/stale/slow
* datanodes to the bottom. After sorting it will
* update block indices and block tokens respectively.
*
@ -588,7 +590,8 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
locToIndex.put(di[i], lsb.getBlockIndices()[i]);
locToToken.put(di[i], lsb.getBlockTokens()[i]);
}
// Move decommissioned/stale datanodes to the bottom
// Arrange the order of datanodes as follows:
// live(in-service) -> stale -> entering_maintenance -> decommissioning -> decommissioned
Arrays.sort(di, comparator);
// must update cache since we modified locations array
@ -602,7 +605,7 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
}
/**
* Move decommissioned/entering_maintenance/stale/slow
* Move decommissioned/decommissioning/entering_maintenance/stale/slow
* datanodes to the bottom. Also, sort nodes by network
* distance.
*
@ -634,8 +637,8 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
}
DatanodeInfoWithStorage[] di = lb.getLocations();
// Move decommissioned/entering_maintenance/stale/slow
// datanodes to the bottom
// Arrange the order of datanodes as follows:
// live(in-service) -> stale -> entering_maintenance -> decommissioning -> decommissioned
Arrays.sort(di, comparator);
// Sort nodes by network distance only for located blocks

View File

@ -64,7 +64,7 @@ public class TestSortLocatedBlock {
*/
@Test(timeout = 30000)
public void testWithStaleDatanodes() throws IOException {
long blockID = Long.MIN_VALUE;
long blockID = Long.MAX_VALUE;
int totalDns = 5;
DatanodeInfo[] locs = new DatanodeInfo[totalDns];
@ -125,10 +125,10 @@ public void testWithStaleDatanodes() throws IOException {
*
* After sorting the expected datanodes list will be:
* live -> slow -> stale -> staleAndSlow ->
* entering_maintenance -> decommissioned.
* entering_maintenance -> decommissioning -> decommissioned.
*
* avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=true
* d5 -> d4 -> d3 -> d2 -> d1 -> d0
* d6 -> d5 -> d4 -> d3 -> d2 -> d1 -> d0
*/
@Test(timeout = 30000)
public void testAviodStaleAndSlowDatanodes() throws IOException {
@ -137,7 +137,7 @@ public void testAviodStaleAndSlowDatanodes() throws IOException {
ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
locatedBlocks.add(new LocatedBlock(
new ExtendedBlock("pool", Long.MIN_VALUE,
new ExtendedBlock("pool", Long.MAX_VALUE,
1024L, new Date().getTime()), locs));
// sort located blocks
@ -148,19 +148,21 @@ public void testAviodStaleAndSlowDatanodes() throws IOException {
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();
// assert location order:
// live -> stale -> entering_maintenance -> decommissioned
// live -> stale -> entering_maintenance -> decommissioning -> decommissioned
// live
assertEquals(locs[5].getIpAddr(), locations[0].getIpAddr());
assertEquals(locs[6].getIpAddr(), locations[0].getIpAddr());
// slow
assertEquals(locs[4].getIpAddr(), locations[1].getIpAddr());
assertEquals(locs[5].getIpAddr(), locations[1].getIpAddr());
// stale
assertEquals(locs[3].getIpAddr(), locations[2].getIpAddr());
assertEquals(locs[4].getIpAddr(), locations[2].getIpAddr());
// stale and slow
assertEquals(locs[2].getIpAddr(), locations[3].getIpAddr());
assertEquals(locs[3].getIpAddr(), locations[3].getIpAddr());
// entering_maintenance
assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr());
assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr());
// decommissioning
assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr());
// decommissioned
assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr());
assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr());
}
/**
@ -169,10 +171,10 @@ public void testAviodStaleAndSlowDatanodes() throws IOException {
*
* After sorting the expected datanodes list will be:
* (live <-> slow) -> (stale <-> staleAndSlow) ->
* entering_maintenance -> decommissioned.
* entering_maintenance -> decommissioning -> decommissioned.
*
* avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=false
* (d5 <-> d4) -> (d3 <-> d2) -> d1 -> d0
* (d6 <-> d5) -> (d4 <-> d3) -> d2 -> d1 -> d0
*/
@Test(timeout = 30000)
public void testAviodStaleDatanodes() throws IOException {
@ -181,7 +183,7 @@ public void testAviodStaleDatanodes() throws IOException {
ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
locatedBlocks.add(new LocatedBlock(
new ExtendedBlock("pool", Long.MIN_VALUE,
new ExtendedBlock("pool", Long.MAX_VALUE,
1024L, new Date().getTime()), locs));
// sort located blocks
@ -192,21 +194,23 @@ public void testAviodStaleDatanodes() throws IOException {
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();
// assert location order:
// live -> stale -> entering_maintenance -> decommissioned
// live -> stale -> entering_maintenance -> decommissioning -> decommissioned.
// live
assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() &&
locs[4].getIpAddr() == locations[1].getIpAddr()) ||
locs[6].getIpAddr() == locations[1].getIpAddr()) ||
(locs[5].getIpAddr() == locations[1].getIpAddr() &&
locs[4].getIpAddr() == locations[0].getIpAddr()));
locs[6].getIpAddr() == locations[0].getIpAddr()));
// stale
assertTrue((locs[3].getIpAddr() == locations[2].getIpAddr() &&
locs[2].getIpAddr() == locations[3].getIpAddr()) ||
(locs[3].getIpAddr() == locations[3].getIpAddr() &&
locs[2].getIpAddr() == locations[2].getIpAddr()));
assertTrue((locs[4].getIpAddr() == locations[3].getIpAddr() &&
locs[3].getIpAddr() == locations[2].getIpAddr()) ||
(locs[4].getIpAddr() == locations[2].getIpAddr() &&
locs[3].getIpAddr() == locations[3].getIpAddr()));
// entering_maintenance
assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr());
assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr());
// decommissioning
assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr());
// decommissioned
assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr());
assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr());
}
/**
@ -215,10 +219,10 @@ public void testAviodStaleDatanodes() throws IOException {
*
* After sorting the expected datanodes list will be:
* (live <-> stale) -> (slow <-> staleAndSlow) ->
* entering_maintenance -> decommissioned.
* entering_maintenance -> decommissioning -> decommissioned.
*
* avoidStaleDataNodesForRead=false && avoidSlowDataNodesForRead=true
* (d5 -> d3) -> (d4 <-> d2) -> d1 -> d0
* (d6 -> d4) -> (d5 <-> d3) -> d2 -> d1 -> d0
*/
@Test(timeout = 30000)
public void testAviodSlowDatanodes() throws IOException {
@ -227,7 +231,7 @@ public void testAviodSlowDatanodes() throws IOException {
ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
locatedBlocks.add(new LocatedBlock(
new ExtendedBlock("pool", Long.MIN_VALUE,
new ExtendedBlock("pool", Long.MAX_VALUE,
1024L, new Date().getTime()), locs));
// sort located blocks
@ -238,34 +242,87 @@ public void testAviodSlowDatanodes() throws IOException {
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();
// assert location order:
// live -> slow -> entering_maintenance -> decommissioned
// live -> slow -> entering_maintenance -> decommissioning -> decommissioned.
// live
assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() &&
locs[3].getIpAddr() == locations[1].getIpAddr()) ||
(locs[5].getIpAddr() == locations[1].getIpAddr() &&
locs[3].getIpAddr() == locations[0].getIpAddr()));
assertTrue((locs[6].getIpAddr() == locations[0].getIpAddr() &&
locs[4].getIpAddr() == locations[1].getIpAddr()) ||
(locs[6].getIpAddr() == locations[1].getIpAddr() &&
locs[4].getIpAddr() == locations[0].getIpAddr()));
// slow
assertTrue((locs[4].getIpAddr() == locations[2].getIpAddr() &&
locs[2].getIpAddr() == locations[3].getIpAddr()) ||
(locs[4].getIpAddr() == locations[3].getIpAddr() &&
locs[2].getIpAddr() == locations[2].getIpAddr()));
assertTrue((locs[5].getIpAddr() == locations[2].getIpAddr() &&
locs[3].getIpAddr() == locations[3].getIpAddr()) ||
(locs[5].getIpAddr() == locations[3].getIpAddr() &&
locs[3].getIpAddr() == locations[2].getIpAddr()));
// entering_maintenance
assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr());
assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr());
// decommissioning
assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr());
// decommissioned
assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr());
assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr());
}
/**
* Test to verify sorting with multiple state
* datanodes exists in storage lists.
*
* After sorting the expected datanodes list will be:
* (live <-> stale <-> slow <-> staleAndSlow) ->
* entering_maintenance -> decommissioning -> decommissioned.
*
* avoidStaleDataNodesForRead=false && avoidSlowDataNodesForRead=false
* (d6 <-> d5 <-> d4 <-> d3) -> d2 -> d1 -> d0
*/
@Test(timeout = 30000)
public void testWithServiceComparator() throws IOException {
DatanodeManager dm = mockDatanodeManager(false, false);
DatanodeInfo[] locs = mockDatanodes(dm);
// mark live/slow/stale datanodes
ArrayList<DatanodeInfo> list = new ArrayList<>();
for (DatanodeInfo loc : locs) {
list.add(loc);
}
// generate blocks
ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
locatedBlocks.add(new LocatedBlock(
new ExtendedBlock("pool", Long.MAX_VALUE,
1024L, new Date().getTime()), locs));
// sort located blocks
dm.sortLocatedBlocks(null, locatedBlocks);
// get locations after sorting
LocatedBlock locatedBlock = locatedBlocks.get(0);
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();
// assert location order:
// live/slow/stale -> entering_maintenance -> decommissioning -> decommissioned.
// live/slow/stale
assertTrue(list.contains(locations[0]) &&
list.contains(locations[1]) &&
list.contains(locations[2]) &&
list.contains(locations[3]));
// entering_maintenance
assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr());
// decommissioning
assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr());
// decommissioned
assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr());
}
/**
* We mock the following list of datanodes, and create LocatedBlock.
* d0 - decommissioned
* d1 - entering_maintenance
* d2 - stale and slow
* d3 - stale
* d4 - slow
* d5 - live(in-service)
* d1 - decommissioning
* d2 - entering_maintenance
* d3 - stale and slow
* d4 - stale
* d5 - slow
* d6 - live(in-service)
*/
private static DatanodeInfo[] mockDatanodes(DatanodeManager dm) {
int totalDns = 6;
int totalDns = 7;
DatanodeInfo[] locs = new DatanodeInfo[totalDns];
// create datanodes
@ -276,17 +333,19 @@ private static DatanodeInfo[] mockDatanodes(DatanodeManager dm) {
}
// set decommissioned state
locs[0].setDecommissioned();
// set decommissioning state
locs[1].startDecommission();
// set entering_maintenance state
locs[1].startMaintenance();
locs[2].startMaintenance();
// set stale and slow state
locs[2].setLastUpdateMonotonic(Time.monotonicNow() -
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1);
dm.addSlowPeers(locs[2].getDatanodeUuid());
// set stale state
locs[3].setLastUpdateMonotonic(Time.monotonicNow() -
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1);
dm.addSlowPeers(locs[3].getDatanodeUuid());
// set stale state
locs[4].setLastUpdateMonotonic(Time.monotonicNow() -
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1);
// set slow state
dm.addSlowPeers(locs[4].getDatanodeUuid());
dm.addSlowPeers(locs[5].getDatanodeUuid());
return locs;
}