diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index bc371ea08a..b9a7bc5eae 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -516,6 +516,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Whether to enable datanode's stale state detection and usage for reads public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode"; public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false; + public static final String DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY = + "dfs.namenode.avoid.read.slow.datanode"; + public static final boolean + DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT = false; // Whether to enable datanode's stale state detection and usage for writes public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode"; public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 9f7e9d5d7c..3e47e557b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -156,23 +156,36 @@ public int compare(DatanodeInfo a, DatanodeInfo b) { /** * Comparator for sorting DataNodeInfo[] based on - * stale, decommissioned and entering_maintenance states. - * Order: live {@literal ->} stale {@literal ->} entering_maintenance - * {@literal ->} decommissioned + * slow, stale, entering_maintenance and decommissioned states. + * Order: live {@literal ->} slow {@literal ->} stale {@literal ->} + * entering_maintenance {@literal ->} decommissioned */ @InterfaceAudience.Private - public static class ServiceAndStaleComparator extends ServiceComparator { + public static class StaleAndSlowComparator extends ServiceComparator { + private final boolean avoidStaleDataNodesForRead; private final long staleInterval; + private final boolean avoidSlowDataNodesForRead; + private final Set slowNodesUuidSet; /** * Constructor of ServiceAndStaleComparator - * + * @param avoidStaleDataNodesForRead + * Whether or not to avoid using stale DataNodes for reading. * @param interval * The time interval for marking datanodes as stale is passed from - * outside, since the interval may be changed dynamically + * outside, since the interval may be changed dynamically. + * @param avoidSlowDataNodesForRead + * Whether or not to avoid using slow DataNodes for reading. + * @param slowNodesUuidSet + * Slow DataNodes UUID set. */ - public ServiceAndStaleComparator(long interval) { + public StaleAndSlowComparator( + boolean avoidStaleDataNodesForRead, long interval, + boolean avoidSlowDataNodesForRead, Set slowNodesUuidSet) { + this.avoidStaleDataNodesForRead = avoidStaleDataNodesForRead; this.staleInterval = interval; + this.avoidSlowDataNodesForRead = avoidSlowDataNodesForRead; + this.slowNodesUuidSet = slowNodesUuidSet; } @Override @@ -183,9 +196,22 @@ public int compare(DatanodeInfo a, DatanodeInfo b) { } // Stale nodes will be moved behind the normal nodes - boolean aStale = a.isStale(staleInterval); - boolean bStale = b.isStale(staleInterval); - return aStale == bStale ? 0 : (aStale ? 1 : -1); + if (avoidStaleDataNodesForRead) { + boolean aStale = a.isStale(staleInterval); + boolean bStale = b.isStale(staleInterval); + ret = aStale == bStale ? 0 : (aStale ? 1 : -1); + if (ret != 0) { + return ret; + } + } + + // Slow nodes will be moved behind the normal nodes + if (avoidSlowDataNodesForRead) { + boolean aSlow = slowNodesUuidSet.contains(a.getDatanodeUuid()); + boolean bSlow = slowNodesUuidSet.contains(b.getDatanodeUuid()); + ret = aSlow == bSlow ? 0 : (aSlow ? 1 : -1); + } + return ret; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 9f68c36033..e978089433 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -1104,8 +1104,8 @@ boolean isGoodDatanode(DatanodeDescriptor node, // check if the target is a slow node if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) { - Set nodes = DatanodeManager.getSlowNodes(); - if (nodes.contains(node)) { + Set slowNodesUuidSet = DatanodeManager.getSlowNodesUuidSet(); + if (slowNodesUuidSet.contains(node.getDatanodeUuid())) { logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW); return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 2e7b338d85..68ee16ca6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -140,6 +140,9 @@ public class DatanodeManager { /** Whether or not to avoid using stale DataNodes for reading */ private final boolean avoidStaleDataNodesForRead; + /** Whether or not to avoid using slow DataNodes for reading. */ + private final boolean avoidSlowDataNodesForRead; + /** Whether or not to consider lad for reading. */ private final boolean readConsiderLoad; @@ -210,7 +213,7 @@ public class DatanodeManager { @Nullable private final SlowPeerTracker slowPeerTracker; - private static Set slowNodesSet = Sets.newConcurrentHashSet(); + private static Set slowNodesUuidSet = Sets.newConcurrentHashSet(); private Daemon slowPeerCollectorDaemon; private final long slowPeerCollectionInterval; private final int maxSlowPeerReportNodes; @@ -242,7 +245,6 @@ public class DatanodeManager { } else { networktopology = NetworkTopology.getInstance(conf); } - this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf); this.datanodeAdminManager = new DatanodeAdminManager(namesystem, @@ -273,7 +275,6 @@ public class DatanodeManager { } this.slowDiskTracker = dataNodeDiskStatsEnabled ? new SlowDiskTracker(conf, timer) : null; - this.defaultXferPort = NetUtils.createSocketAddr( conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); @@ -294,11 +295,9 @@ public class DatanodeManager { } catch (IOException e) { LOG.error("error reading hosts files: ", e); } - this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); - this.rejectUnresolvedTopologyDN = conf.getBoolean( DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY, DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT); @@ -313,7 +312,6 @@ public class DatanodeManager { } dnsToSwitchMapping.resolve(locations); } - heartbeatIntervalSeconds = conf.getTimeDuration( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS); @@ -322,7 +320,6 @@ public class DatanodeManager { DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds; - // Effected block invalidate limit is the bigger value between // value configured in hdfs-site.xml, and 20 * HB interval. final int configuredBlockInvalidateLimit = conf.getInt( @@ -335,16 +332,17 @@ public class DatanodeManager { + ": configured=" + configuredBlockInvalidateLimit + ", counted=" + countedBlockInvalidateLimit + ", effected=" + blockInvalidateLimit); - this.checkIpHostnameInRegistration = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY, DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT); LOG.info(DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY + "=" + checkIpHostnameInRegistration); - this.avoidStaleDataNodesForRead = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT); + this.avoidSlowDataNodesForRead = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, + DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT); this.readConsiderLoad = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT); @@ -389,7 +387,7 @@ private void startSlowPeerCollector() { public void run() { while (true) { try { - slowNodesSet = getSlowPeers(); + slowNodesUuidSet = getSlowPeersUuidSet(); } catch (Exception e) { LOG.error("Failed to collect slow peers", e); } @@ -509,12 +507,16 @@ private boolean isInactive(DatanodeInfo datanode) { return datanode.isDecommissioned() || datanode.isEnteringMaintenance() || (avoidStaleDataNodesForRead && datanode.isStale(staleInterval)); } + + private boolean isSlowNode(String dnUuid) { + return avoidSlowDataNodesForRead && slowNodesUuidSet.contains(dnUuid); + } /** * Sort the non-striped located blocks by the distance to the target host. * - * For striped blocks, it will only move decommissioned/stale nodes to the - * bottom. For example, assume we have storage list: + * For striped blocks, it will only move decommissioned/stale/slow + * nodes to the bottom. For example, assume we have storage list: * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9 * mapping to block indices: * 0, 1, 2, 3, 4, 5, 6, 7, 8, 2 @@ -526,8 +528,11 @@ private boolean isInactive(DatanodeInfo datanode) { */ public void sortLocatedBlocks(final String targetHost, final List locatedBlocks) { - Comparator comparator = avoidStaleDataNodesForRead ? - new DFSUtil.ServiceAndStaleComparator(staleInterval) : + Comparator comparator = + avoidStaleDataNodesForRead || avoidSlowDataNodesForRead ? + new DFSUtil.StaleAndSlowComparator( + avoidStaleDataNodesForRead, staleInterval, + avoidSlowDataNodesForRead, slowNodesUuidSet) : new DFSUtil.ServiceComparator(); // sort located block for (LocatedBlock lb : locatedBlocks) { @@ -540,7 +545,8 @@ public void sortLocatedBlocks(final String targetHost, } /** - * Move decommissioned/stale datanodes to the bottom. After sorting it will + * Move decommissioned/entering_maintenance/stale/slow + * datanodes to the bottom. After sorting it will * update block indices and block tokens respectively. * * @param lb located striped block @@ -571,8 +577,9 @@ private void sortLocatedStripedBlock(final LocatedBlock lb, } /** - * Move decommissioned/entering_maintenance/stale datanodes to the bottom. - * Also, sort nodes by network distance. + * Move decommissioned/entering_maintenance/stale/slow + * datanodes to the bottom. Also, sort nodes by network + * distance. * * @param lb located block * @param targetHost target host @@ -602,12 +609,15 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost, } DatanodeInfoWithStorage[] di = lb.getLocations(); - // Move decommissioned/entering_maintenance/stale datanodes to the bottom + // Move decommissioned/entering_maintenance/stale/slow + // datanodes to the bottom Arrays.sort(di, comparator); // Sort nodes by network distance only for located blocks int lastActiveIndex = di.length - 1; - while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) { + while (lastActiveIndex > 0 && ( + isSlowNode(di[lastActiveIndex].getDatanodeUuid()) || + isInactive(di[lastActiveIndex]))) { --lastActiveIndex; } int activeLen = lastActiveIndex + 1; @@ -2083,10 +2093,10 @@ public String getSlowPeersReport() { * Returns all tracking slow peers. * @return */ - public Set getSlowPeers() { - Set slowPeersSet = Sets.newConcurrentHashSet(); + public Set getSlowPeersUuidSet() { + Set slowPeersUuidSet = Sets.newConcurrentHashSet(); if (slowPeerTracker == null) { - return slowPeersSet; + return slowPeersUuidSet; } ArrayList slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes); @@ -2099,18 +2109,18 @@ public Set getSlowPeers() { DatanodeDescriptor datanodeByHost = host2DatanodeMap.getDatanodeByHost(ipAddr); if (datanodeByHost != null) { - slowPeersSet.add(datanodeByHost); + slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid()); } } - return slowPeersSet; + return slowPeersUuidSet; } /** - * Returns all tracking slow peers. + * Returns all tracking slow datanodes uuids. * @return */ - public static Set getSlowNodes() { - return slowNodesSet; + public static Set getSlowNodesUuidSet() { + return slowNodesUuidSet; } /** @@ -2128,6 +2138,12 @@ public SlowPeerTracker getSlowPeerTracker() { public SlowDiskTracker getSlowDiskTracker() { return slowDiskTracker; } + + @VisibleForTesting + public void addSlowPeers(String dnUuid) { + slowNodesUuidSet.add(dnUuid); + } + /** * Retrieve information about slow disks as a JSON. * Returns null if we are not tracking slow disks. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index e540a677e0..78d8e033ec 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2110,6 +2110,16 @@ + + dfs.namenode.avoid.read.slow.datanode + false + + Indicate whether or not to avoid reading from "slow" datanodes. + Slow datanodes will be moved to the end of the node list returned + for reading. + + + dfs.namenode.avoid.write.stale.datanode false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java index f40317d8e7..f2c24a646b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.net.Node; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -100,12 +99,12 @@ public void testChooseTargetExcludeSlowNodes() throws Exception { Thread.sleep(3000); // fetch slow nodes - Set slowPeers = dnManager.getSlowPeers(); + Set slowPeers = dnManager.getSlowPeersUuidSet(); // assert slow nodes assertEquals(3, slowPeers.size()); for (int i = 0; i < slowPeers.size(); i++) { - assertTrue(slowPeers.contains(dataNodes[i])); + assertTrue(slowPeers.contains(dataNodes[i].getDatanodeUuid())); } // mock writer @@ -120,7 +119,8 @@ public void testChooseTargetExcludeSlowNodes() throws Exception { // assert targets assertEquals(3, targets.length); for (int i = 0; i < targets.length; i++) { - assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor())); + assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor() + .getDatanodeUuid())); } } finally { namenode.getNamesystem().writeUnlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedBlock.java index 6b96fdf32b..45eedace2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedBlock.java @@ -27,34 +27,24 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.util.Time; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * This class tests the sorting of located blocks based on * multiple states. */ public class TestSortLocatedBlock { - static final Logger LOG = LoggerFactory - .getLogger(TestSortLocatedBlock.class); - private static DatanodeManager dm; private static final long STALE_INTERVAL = 30 * 1000 * 60; - @BeforeClass - public static void setup() throws IOException { - dm = mockDatanodeManager(); - } - /** * Test to verify sorting with multiple state * datanodes exists in storage lists. @@ -73,8 +63,7 @@ public static void setup() throws IOException { * (d4 -> d3 -> d1 -> d2 -> d0). */ @Test(timeout = 30000) - public void testWithMultipleStateDatanodes() { - LOG.info("Starting test testWithMultipleStateDatanodes"); + public void testWithStaleDatanodes() throws IOException { long blockID = Long.MIN_VALUE; int totalDns = 5; DatanodeInfo[] locs = new DatanodeInfo[totalDns]; @@ -106,6 +95,7 @@ public void testWithMultipleStateDatanodes() { 1024L, new Date().getTime()), locs)); // sort located blocks + DatanodeManager dm = mockDatanodeManager(true, false); dm.sortLocatedBlocks(null, locatedBlocks); // get locations after sorting @@ -114,6 +104,9 @@ public void testWithMultipleStateDatanodes() { // assert location order: // live -> stale -> entering_maintenance -> decommissioned + // (d4 -> d3 -> d1 -> d0 -> d2) + // or + // (d4 -> d3 -> d1 -> d2 -> d0). // live assertEquals(locs[4].getIpAddr(), locations[0].getIpAddr()); // stale @@ -126,11 +119,188 @@ public void testWithMultipleStateDatanodes() { && decommissionedNodes.contains(locations[4])); } - private static DatanodeManager mockDatanodeManager() throws IOException { + /** + * Test to verify sorting with multiple state + * datanodes exists in storage lists. + * + * After sorting the expected datanodes list will be: + * live -> slow -> stale -> staleAndSlow -> + * entering_maintenance -> decommissioned. + * + * avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=true + * d5 -> d4 -> d3 -> d2 -> d1 -> d0 + */ + @Test(timeout = 30000) + public void testAviodStaleAndSlowDatanodes() throws IOException { + DatanodeManager dm = mockDatanodeManager(true, true); + DatanodeInfo[] locs = mockDatanodes(dm); + + ArrayList locatedBlocks = new ArrayList<>(); + locatedBlocks.add(new LocatedBlock( + new ExtendedBlock("pool", Long.MIN_VALUE, + 1024L, new Date().getTime()), locs)); + + // sort located blocks + dm.sortLocatedBlocks(null, locatedBlocks); + + // get locations after sorting + LocatedBlock locatedBlock = locatedBlocks.get(0); + DatanodeInfoWithStorage[] locations = locatedBlock.getLocations(); + + // assert location order: + // live -> stale -> entering_maintenance -> decommissioned + // live + assertEquals(locs[5].getIpAddr(), locations[0].getIpAddr()); + // slow + assertEquals(locs[4].getIpAddr(), locations[1].getIpAddr()); + // stale + assertEquals(locs[3].getIpAddr(), locations[2].getIpAddr()); + // stale and slow + assertEquals(locs[2].getIpAddr(), locations[3].getIpAddr()); + // entering_maintenance + assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr()); + // decommissioned + assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr()); + } + + /** + * Test to verify sorting with multiple state + * datanodes exists in storage lists. + * + * After sorting the expected datanodes list will be: + * (live <-> slow) -> (stale <-> staleAndSlow) -> + * entering_maintenance -> decommissioned. + * + * avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=false + * (d5 <-> d4) -> (d3 <-> d2) -> d1 -> d0 + */ + @Test(timeout = 30000) + public void testAviodStaleDatanodes() throws IOException { + DatanodeManager dm = mockDatanodeManager(true, false); + DatanodeInfo[] locs = mockDatanodes(dm); + + ArrayList locatedBlocks = new ArrayList<>(); + locatedBlocks.add(new LocatedBlock( + new ExtendedBlock("pool", Long.MIN_VALUE, + 1024L, new Date().getTime()), locs)); + + // sort located blocks + dm.sortLocatedBlocks(null, locatedBlocks); + + // get locations after sorting + LocatedBlock locatedBlock = locatedBlocks.get(0); + DatanodeInfoWithStorage[] locations = locatedBlock.getLocations(); + + // assert location order: + // live -> stale -> entering_maintenance -> decommissioned + // live + assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() && + locs[4].getIpAddr() == locations[1].getIpAddr()) || + (locs[5].getIpAddr() == locations[1].getIpAddr() && + locs[4].getIpAddr() == locations[0].getIpAddr())); + // stale + assertTrue((locs[3].getIpAddr() == locations[2].getIpAddr() && + locs[2].getIpAddr() == locations[3].getIpAddr()) || + (locs[3].getIpAddr() == locations[3].getIpAddr() && + locs[2].getIpAddr() == locations[2].getIpAddr())); + // entering_maintenance + assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr()); + // decommissioned + assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr()); + } + + /** + * Test to verify sorting with multiple state + * datanodes exists in storage lists. + * + * After sorting the expected datanodes list will be: + * (live <-> stale) -> (slow <-> staleAndSlow) -> + * entering_maintenance -> decommissioned. + * + * avoidStaleDataNodesForRead=false && avoidSlowDataNodesForRead=true + * (d5 -> d3) -> (d4 <-> d2) -> d1 -> d0 + */ + @Test(timeout = 30000) + public void testAviodSlowDatanodes() throws IOException { + DatanodeManager dm = mockDatanodeManager(false, true); + DatanodeInfo[] locs = mockDatanodes(dm); + + ArrayList locatedBlocks = new ArrayList<>(); + locatedBlocks.add(new LocatedBlock( + new ExtendedBlock("pool", Long.MIN_VALUE, + 1024L, new Date().getTime()), locs)); + + // sort located blocks + dm.sortLocatedBlocks(null, locatedBlocks); + + // get locations after sorting + LocatedBlock locatedBlock = locatedBlocks.get(0); + DatanodeInfoWithStorage[] locations = locatedBlock.getLocations(); + + // assert location order: + // live -> slow -> entering_maintenance -> decommissioned + // live + assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() && + locs[3].getIpAddr() == locations[1].getIpAddr()) || + (locs[5].getIpAddr() == locations[1].getIpAddr() && + locs[3].getIpAddr() == locations[0].getIpAddr())); + // slow + assertTrue((locs[4].getIpAddr() == locations[2].getIpAddr() && + locs[2].getIpAddr() == locations[3].getIpAddr()) || + (locs[4].getIpAddr() == locations[3].getIpAddr() && + locs[2].getIpAddr() == locations[2].getIpAddr())); + // entering_maintenance + assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr()); + // decommissioned + assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr()); + } + + /** + * We mock the following list of datanodes, and create LocatedBlock. + * d0 - decommissioned + * d1 - entering_maintenance + * d2 - stale and slow + * d3 - stale + * d4 - slow + * d5 - live(in-service) + */ + private static DatanodeInfo[] mockDatanodes(DatanodeManager dm) { + int totalDns = 6; + DatanodeInfo[] locs = new DatanodeInfo[totalDns]; + + // create datanodes + for (int i = 0; i < totalDns; i++) { + String ip = i + "." + i + "." + i + "." + i; + locs[i] = DFSTestUtil.getDatanodeInfo(ip); + locs[i].setLastUpdateMonotonic(Time.monotonicNow()); + } + // set decommissioned state + locs[0].setDecommissioned(); + // set entering_maintenance state + locs[1].startMaintenance(); + // set stale and slow state + locs[2].setLastUpdateMonotonic(Time.monotonicNow() - + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1); + dm.addSlowPeers(locs[2].getDatanodeUuid()); + // set stale state + locs[3].setLastUpdateMonotonic(Time.monotonicNow() - + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1); + // set slow state + dm.addSlowPeers(locs[4].getDatanodeUuid()); + + return locs; + } + + private static DatanodeManager mockDatanodeManager( + boolean avoidStaleDNForRead, boolean avoidSlowDNForRead) + throws IOException { Configuration conf = new Configuration(); conf.setBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, - true); + avoidStaleDNForRead); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, + avoidSlowDNForRead); conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, STALE_INTERVAL); FSNamesystem fsn = Mockito.mock(FSNamesystem.class);