diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 25726cee51..3a3219447d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -139,13 +139,20 @@ public static SecureRandom getSecureRandom() { public static class ServiceComparator implements Comparator { @Override public int compare(DatanodeInfo a, DatanodeInfo b) { - // Decommissioned nodes will still be moved to the end of the list + // Decommissioned nodes will be moved to the end of the list. if (a.isDecommissioned()) { return b.isDecommissioned() ? 0 : 1; } else if (b.isDecommissioned()) { return -1; } + // Decommissioning nodes will be placed before decommissioned nodes. + if (a.isDecommissionInProgress()) { + return b.isDecommissionInProgress() ? 0 : 1; + } else if (b.isDecommissionInProgress()) { + return -1; + } + // ENTERING_MAINTENANCE nodes should be after live nodes. if (a.isEnteringMaintenance()) { return b.isEnteringMaintenance() ? 0 : 1; @@ -159,9 +166,9 @@ public int compare(DatanodeInfo a, DatanodeInfo b) { /** * Comparator for sorting DataNodeInfo[] based on - * slow, stale, entering_maintenance and decommissioned states. + * slow, stale, entering_maintenance, decommissioning and decommissioned states. * Order: live {@literal ->} slow {@literal ->} stale {@literal ->} - * entering_maintenance {@literal ->} decommissioned + * entering_maintenance {@literal ->} decommissioning {@literal ->} decommissioned */ @InterfaceAudience.Private public static class StaleAndSlowComparator extends ServiceComparator { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 2249616702..a0e94a5c1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -511,7 +511,9 @@ public DatanodeStatistics getDatanodeStatistics() { } private boolean isInactive(DatanodeInfo datanode) { - return datanode.isDecommissioned() || datanode.isEnteringMaintenance() || + return datanode.isDecommissioned() || + datanode.isDecommissionInProgress() || + datanode.isEnteringMaintenance() || (avoidStaleDataNodesForRead && datanode.isStale(staleInterval)); } @@ -540,7 +542,7 @@ public int getMaxSlowpeerCollectNodes() { /** * Sort the non-striped located blocks by the distance to the target host. * - * For striped blocks, it will only move decommissioned/stale/slow + * For striped blocks, it will only move decommissioned/decommissioning/stale/slow * nodes to the bottom. For example, assume we have storage list: * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9 * mapping to block indices: @@ -570,7 +572,7 @@ public void sortLocatedBlocks(final String targetHost, } /** - * Move decommissioned/entering_maintenance/stale/slow + * Move decommissioned/decommissioning/entering_maintenance/stale/slow * datanodes to the bottom. After sorting it will * update block indices and block tokens respectively. * @@ -588,7 +590,8 @@ private void sortLocatedStripedBlock(final LocatedBlock lb, locToIndex.put(di[i], lsb.getBlockIndices()[i]); locToToken.put(di[i], lsb.getBlockTokens()[i]); } - // Move decommissioned/stale datanodes to the bottom + // Arrange the order of datanodes as follows: + // live(in-service) -> stale -> entering_maintenance -> decommissioning -> decommissioned Arrays.sort(di, comparator); // must update cache since we modified locations array @@ -602,7 +605,7 @@ private void sortLocatedStripedBlock(final LocatedBlock lb, } /** - * Move decommissioned/entering_maintenance/stale/slow + * Move decommissioned/decommissioning/entering_maintenance/stale/slow * datanodes to the bottom. Also, sort nodes by network * distance. * @@ -634,8 +637,8 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost, } DatanodeInfoWithStorage[] di = lb.getLocations(); - // Move decommissioned/entering_maintenance/stale/slow - // datanodes to the bottom + // Arrange the order of datanodes as follows: + // live(in-service) -> stale -> entering_maintenance -> decommissioning -> decommissioned Arrays.sort(di, comparator); // Sort nodes by network distance only for located blocks diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedBlock.java index 45eedace2c..cbf9938066 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedBlock.java @@ -64,7 +64,7 @@ public class TestSortLocatedBlock { */ @Test(timeout = 30000) public void testWithStaleDatanodes() throws IOException { - long blockID = Long.MIN_VALUE; + long blockID = Long.MAX_VALUE; int totalDns = 5; DatanodeInfo[] locs = new DatanodeInfo[totalDns]; @@ -125,10 +125,10 @@ public void testWithStaleDatanodes() throws IOException { * * After sorting the expected datanodes list will be: * live -> slow -> stale -> staleAndSlow -> - * entering_maintenance -> decommissioned. + * entering_maintenance -> decommissioning -> decommissioned. * * avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=true - * d5 -> d4 -> d3 -> d2 -> d1 -> d0 + * d6 -> d5 -> d4 -> d3 -> d2 -> d1 -> d0 */ @Test(timeout = 30000) public void testAviodStaleAndSlowDatanodes() throws IOException { @@ -137,7 +137,7 @@ public void testAviodStaleAndSlowDatanodes() throws IOException { ArrayList locatedBlocks = new ArrayList<>(); locatedBlocks.add(new LocatedBlock( - new ExtendedBlock("pool", Long.MIN_VALUE, + new ExtendedBlock("pool", Long.MAX_VALUE, 1024L, new Date().getTime()), locs)); // sort located blocks @@ -148,19 +148,21 @@ public void testAviodStaleAndSlowDatanodes() throws IOException { DatanodeInfoWithStorage[] locations = locatedBlock.getLocations(); // assert location order: - // live -> stale -> entering_maintenance -> decommissioned + // live -> stale -> entering_maintenance -> decommissioning -> decommissioned // live - assertEquals(locs[5].getIpAddr(), locations[0].getIpAddr()); + assertEquals(locs[6].getIpAddr(), locations[0].getIpAddr()); // slow - assertEquals(locs[4].getIpAddr(), locations[1].getIpAddr()); + assertEquals(locs[5].getIpAddr(), locations[1].getIpAddr()); // stale - assertEquals(locs[3].getIpAddr(), locations[2].getIpAddr()); + assertEquals(locs[4].getIpAddr(), locations[2].getIpAddr()); // stale and slow - assertEquals(locs[2].getIpAddr(), locations[3].getIpAddr()); + assertEquals(locs[3].getIpAddr(), locations[3].getIpAddr()); // entering_maintenance - assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr()); + assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr()); + // decommissioning + assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr()); // decommissioned - assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr()); + assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr()); } /** @@ -169,10 +171,10 @@ public void testAviodStaleAndSlowDatanodes() throws IOException { * * After sorting the expected datanodes list will be: * (live <-> slow) -> (stale <-> staleAndSlow) -> - * entering_maintenance -> decommissioned. + * entering_maintenance -> decommissioning -> decommissioned. * * avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=false - * (d5 <-> d4) -> (d3 <-> d2) -> d1 -> d0 + * (d6 <-> d5) -> (d4 <-> d3) -> d2 -> d1 -> d0 */ @Test(timeout = 30000) public void testAviodStaleDatanodes() throws IOException { @@ -181,7 +183,7 @@ public void testAviodStaleDatanodes() throws IOException { ArrayList locatedBlocks = new ArrayList<>(); locatedBlocks.add(new LocatedBlock( - new ExtendedBlock("pool", Long.MIN_VALUE, + new ExtendedBlock("pool", Long.MAX_VALUE, 1024L, new Date().getTime()), locs)); // sort located blocks @@ -192,21 +194,23 @@ public void testAviodStaleDatanodes() throws IOException { DatanodeInfoWithStorage[] locations = locatedBlock.getLocations(); // assert location order: - // live -> stale -> entering_maintenance -> decommissioned + // live -> stale -> entering_maintenance -> decommissioning -> decommissioned. // live assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() && - locs[4].getIpAddr() == locations[1].getIpAddr()) || + locs[6].getIpAddr() == locations[1].getIpAddr()) || (locs[5].getIpAddr() == locations[1].getIpAddr() && - locs[4].getIpAddr() == locations[0].getIpAddr())); + locs[6].getIpAddr() == locations[0].getIpAddr())); // stale - assertTrue((locs[3].getIpAddr() == locations[2].getIpAddr() && - locs[2].getIpAddr() == locations[3].getIpAddr()) || - (locs[3].getIpAddr() == locations[3].getIpAddr() && - locs[2].getIpAddr() == locations[2].getIpAddr())); + assertTrue((locs[4].getIpAddr() == locations[3].getIpAddr() && + locs[3].getIpAddr() == locations[2].getIpAddr()) || + (locs[4].getIpAddr() == locations[2].getIpAddr() && + locs[3].getIpAddr() == locations[3].getIpAddr())); // entering_maintenance - assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr()); + assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr()); + // decommissioning + assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr()); // decommissioned - assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr()); + assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr()); } /** @@ -215,10 +219,10 @@ public void testAviodStaleDatanodes() throws IOException { * * After sorting the expected datanodes list will be: * (live <-> stale) -> (slow <-> staleAndSlow) -> - * entering_maintenance -> decommissioned. + * entering_maintenance -> decommissioning -> decommissioned. * * avoidStaleDataNodesForRead=false && avoidSlowDataNodesForRead=true - * (d5 -> d3) -> (d4 <-> d2) -> d1 -> d0 + * (d6 -> d4) -> (d5 <-> d3) -> d2 -> d1 -> d0 */ @Test(timeout = 30000) public void testAviodSlowDatanodes() throws IOException { @@ -227,7 +231,7 @@ public void testAviodSlowDatanodes() throws IOException { ArrayList locatedBlocks = new ArrayList<>(); locatedBlocks.add(new LocatedBlock( - new ExtendedBlock("pool", Long.MIN_VALUE, + new ExtendedBlock("pool", Long.MAX_VALUE, 1024L, new Date().getTime()), locs)); // sort located blocks @@ -238,34 +242,87 @@ public void testAviodSlowDatanodes() throws IOException { DatanodeInfoWithStorage[] locations = locatedBlock.getLocations(); // assert location order: - // live -> slow -> entering_maintenance -> decommissioned + // live -> slow -> entering_maintenance -> decommissioning -> decommissioned. // live - assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() && - locs[3].getIpAddr() == locations[1].getIpAddr()) || - (locs[5].getIpAddr() == locations[1].getIpAddr() && - locs[3].getIpAddr() == locations[0].getIpAddr())); + assertTrue((locs[6].getIpAddr() == locations[0].getIpAddr() && + locs[4].getIpAddr() == locations[1].getIpAddr()) || + (locs[6].getIpAddr() == locations[1].getIpAddr() && + locs[4].getIpAddr() == locations[0].getIpAddr())); // slow - assertTrue((locs[4].getIpAddr() == locations[2].getIpAddr() && - locs[2].getIpAddr() == locations[3].getIpAddr()) || - (locs[4].getIpAddr() == locations[3].getIpAddr() && - locs[2].getIpAddr() == locations[2].getIpAddr())); + assertTrue((locs[5].getIpAddr() == locations[2].getIpAddr() && + locs[3].getIpAddr() == locations[3].getIpAddr()) || + (locs[5].getIpAddr() == locations[3].getIpAddr() && + locs[3].getIpAddr() == locations[2].getIpAddr())); // entering_maintenance - assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr()); + assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr()); + // decommissioning + assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr()); // decommissioned - assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr()); + assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr()); + } + + /** + * Test to verify sorting with multiple state + * datanodes exists in storage lists. + * + * After sorting the expected datanodes list will be: + * (live <-> stale <-> slow <-> staleAndSlow) -> + * entering_maintenance -> decommissioning -> decommissioned. + * + * avoidStaleDataNodesForRead=false && avoidSlowDataNodesForRead=false + * (d6 <-> d5 <-> d4 <-> d3) -> d2 -> d1 -> d0 + */ + @Test(timeout = 30000) + public void testWithServiceComparator() throws IOException { + DatanodeManager dm = mockDatanodeManager(false, false); + DatanodeInfo[] locs = mockDatanodes(dm); + + // mark live/slow/stale datanodes + ArrayList list = new ArrayList<>(); + for (DatanodeInfo loc : locs) { + list.add(loc); + } + + // generate blocks + ArrayList locatedBlocks = new ArrayList<>(); + locatedBlocks.add(new LocatedBlock( + new ExtendedBlock("pool", Long.MAX_VALUE, + 1024L, new Date().getTime()), locs)); + + // sort located blocks + dm.sortLocatedBlocks(null, locatedBlocks); + + // get locations after sorting + LocatedBlock locatedBlock = locatedBlocks.get(0); + DatanodeInfoWithStorage[] locations = locatedBlock.getLocations(); + + // assert location order: + // live/slow/stale -> entering_maintenance -> decommissioning -> decommissioned. + // live/slow/stale + assertTrue(list.contains(locations[0]) && + list.contains(locations[1]) && + list.contains(locations[2]) && + list.contains(locations[3])); + // entering_maintenance + assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr()); + // decommissioning + assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr()); + // decommissioned + assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr()); } /** * We mock the following list of datanodes, and create LocatedBlock. * d0 - decommissioned - * d1 - entering_maintenance - * d2 - stale and slow - * d3 - stale - * d4 - slow - * d5 - live(in-service) + * d1 - decommissioning + * d2 - entering_maintenance + * d3 - stale and slow + * d4 - stale + * d5 - slow + * d6 - live(in-service) */ private static DatanodeInfo[] mockDatanodes(DatanodeManager dm) { - int totalDns = 6; + int totalDns = 7; DatanodeInfo[] locs = new DatanodeInfo[totalDns]; // create datanodes @@ -276,17 +333,19 @@ private static DatanodeInfo[] mockDatanodes(DatanodeManager dm) { } // set decommissioned state locs[0].setDecommissioned(); + // set decommissioning state + locs[1].startDecommission(); // set entering_maintenance state - locs[1].startMaintenance(); + locs[2].startMaintenance(); // set stale and slow state - locs[2].setLastUpdateMonotonic(Time.monotonicNow() - - DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1); - dm.addSlowPeers(locs[2].getDatanodeUuid()); - // set stale state locs[3].setLastUpdateMonotonic(Time.monotonicNow() - DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1); + dm.addSlowPeers(locs[3].getDatanodeUuid()); + // set stale state + locs[4].setLastUpdateMonotonic(Time.monotonicNow() - + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1); // set slow state - dm.addSlowPeers(locs[4].getDatanodeUuid()); + dm.addSlowPeers(locs[5].getDatanodeUuid()); return locs; }