HDFS-16076. Avoid using slow DataNodes for reading by sorting locations (#3117)
This commit is contained in:
parent
ef5dbc7283
commit
fdef2b4cca
@ -516,6 +516,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
// Whether to enable datanode's stale state detection and usage for reads
|
||||
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
|
||||
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
|
||||
public static final String DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY =
|
||||
"dfs.namenode.avoid.read.slow.datanode";
|
||||
public static final boolean
|
||||
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT = false;
|
||||
// Whether to enable datanode's stale state detection and usage for writes
|
||||
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
|
||||
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
|
||||
|
@ -156,23 +156,36 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {
|
||||
|
||||
/**
|
||||
* Comparator for sorting DataNodeInfo[] based on
|
||||
* stale, decommissioned and entering_maintenance states.
|
||||
* Order: live {@literal ->} stale {@literal ->} entering_maintenance
|
||||
* {@literal ->} decommissioned
|
||||
* slow, stale, entering_maintenance and decommissioned states.
|
||||
* Order: live {@literal ->} slow {@literal ->} stale {@literal ->}
|
||||
* entering_maintenance {@literal ->} decommissioned
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static class ServiceAndStaleComparator extends ServiceComparator {
|
||||
public static class StaleAndSlowComparator extends ServiceComparator {
|
||||
private final boolean avoidStaleDataNodesForRead;
|
||||
private final long staleInterval;
|
||||
private final boolean avoidSlowDataNodesForRead;
|
||||
private final Set<String> slowNodesUuidSet;
|
||||
|
||||
/**
|
||||
* Constructor of ServiceAndStaleComparator
|
||||
*
|
||||
* @param avoidStaleDataNodesForRead
|
||||
* Whether or not to avoid using stale DataNodes for reading.
|
||||
* @param interval
|
||||
* The time interval for marking datanodes as stale is passed from
|
||||
* outside, since the interval may be changed dynamically
|
||||
* outside, since the interval may be changed dynamically.
|
||||
* @param avoidSlowDataNodesForRead
|
||||
* Whether or not to avoid using slow DataNodes for reading.
|
||||
* @param slowNodesUuidSet
|
||||
* Slow DataNodes UUID set.
|
||||
*/
|
||||
public ServiceAndStaleComparator(long interval) {
|
||||
public StaleAndSlowComparator(
|
||||
boolean avoidStaleDataNodesForRead, long interval,
|
||||
boolean avoidSlowDataNodesForRead, Set<String> slowNodesUuidSet) {
|
||||
this.avoidStaleDataNodesForRead = avoidStaleDataNodesForRead;
|
||||
this.staleInterval = interval;
|
||||
this.avoidSlowDataNodesForRead = avoidSlowDataNodesForRead;
|
||||
this.slowNodesUuidSet = slowNodesUuidSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -183,9 +196,22 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {
|
||||
}
|
||||
|
||||
// Stale nodes will be moved behind the normal nodes
|
||||
boolean aStale = a.isStale(staleInterval);
|
||||
boolean bStale = b.isStale(staleInterval);
|
||||
return aStale == bStale ? 0 : (aStale ? 1 : -1);
|
||||
if (avoidStaleDataNodesForRead) {
|
||||
boolean aStale = a.isStale(staleInterval);
|
||||
boolean bStale = b.isStale(staleInterval);
|
||||
ret = aStale == bStale ? 0 : (aStale ? 1 : -1);
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
// Slow nodes will be moved behind the normal nodes
|
||||
if (avoidSlowDataNodesForRead) {
|
||||
boolean aSlow = slowNodesUuidSet.contains(a.getDatanodeUuid());
|
||||
boolean bSlow = slowNodesUuidSet.contains(b.getDatanodeUuid());
|
||||
ret = aSlow == bSlow ? 0 : (aSlow ? 1 : -1);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1104,8 +1104,8 @@ boolean isGoodDatanode(DatanodeDescriptor node,
|
||||
|
||||
// check if the target is a slow node
|
||||
if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) {
|
||||
Set<Node> nodes = DatanodeManager.getSlowNodes();
|
||||
if (nodes.contains(node)) {
|
||||
Set<String> slowNodesUuidSet = DatanodeManager.getSlowNodesUuidSet();
|
||||
if (slowNodesUuidSet.contains(node.getDatanodeUuid())) {
|
||||
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW);
|
||||
return false;
|
||||
}
|
||||
|
@ -140,6 +140,9 @@ public class DatanodeManager {
|
||||
/** Whether or not to avoid using stale DataNodes for reading */
|
||||
private final boolean avoidStaleDataNodesForRead;
|
||||
|
||||
/** Whether or not to avoid using slow DataNodes for reading. */
|
||||
private final boolean avoidSlowDataNodesForRead;
|
||||
|
||||
/** Whether or not to consider lad for reading. */
|
||||
private final boolean readConsiderLoad;
|
||||
|
||||
@ -210,7 +213,7 @@ public class DatanodeManager {
|
||||
|
||||
@Nullable
|
||||
private final SlowPeerTracker slowPeerTracker;
|
||||
private static Set<Node> slowNodesSet = Sets.newConcurrentHashSet();
|
||||
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
|
||||
private Daemon slowPeerCollectorDaemon;
|
||||
private final long slowPeerCollectionInterval;
|
||||
private final int maxSlowPeerReportNodes;
|
||||
@ -242,7 +245,6 @@ public class DatanodeManager {
|
||||
} else {
|
||||
networktopology = NetworkTopology.getInstance(conf);
|
||||
}
|
||||
|
||||
this.heartbeatManager = new HeartbeatManager(namesystem,
|
||||
blockManager, conf);
|
||||
this.datanodeAdminManager = new DatanodeAdminManager(namesystem,
|
||||
@ -273,7 +275,6 @@ public class DatanodeManager {
|
||||
}
|
||||
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
|
||||
new SlowDiskTracker(conf, timer) : null;
|
||||
|
||||
this.defaultXferPort = NetUtils.createSocketAddr(
|
||||
conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
|
||||
@ -294,11 +295,9 @@ public class DatanodeManager {
|
||||
} catch (IOException e) {
|
||||
LOG.error("error reading hosts files: ", e);
|
||||
}
|
||||
|
||||
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
|
||||
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
|
||||
|
||||
this.rejectUnresolvedTopologyDN = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY,
|
||||
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT);
|
||||
@ -313,7 +312,6 @@ public class DatanodeManager {
|
||||
}
|
||||
dnsToSwitchMapping.resolve(locations);
|
||||
}
|
||||
|
||||
heartbeatIntervalSeconds = conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
|
||||
@ -322,7 +320,6 @@ public class DatanodeManager {
|
||||
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
|
||||
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
|
||||
+ 10 * 1000 * heartbeatIntervalSeconds;
|
||||
|
||||
// Effected block invalidate limit is the bigger value between
|
||||
// value configured in hdfs-site.xml, and 20 * HB interval.
|
||||
final int configuredBlockInvalidateLimit = conf.getInt(
|
||||
@ -335,16 +332,17 @@ public class DatanodeManager {
|
||||
+ ": configured=" + configuredBlockInvalidateLimit
|
||||
+ ", counted=" + countedBlockInvalidateLimit
|
||||
+ ", effected=" + blockInvalidateLimit);
|
||||
|
||||
this.checkIpHostnameInRegistration = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT);
|
||||
LOG.info(DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY
|
||||
+ "=" + checkIpHostnameInRegistration);
|
||||
|
||||
this.avoidStaleDataNodesForRead = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
|
||||
this.avoidSlowDataNodesForRead = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT);
|
||||
this.readConsiderLoad = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT);
|
||||
@ -389,7 +387,7 @@ private void startSlowPeerCollector() {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
slowNodesSet = getSlowPeers();
|
||||
slowNodesUuidSet = getSlowPeersUuidSet();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to collect slow peers", e);
|
||||
}
|
||||
@ -510,11 +508,15 @@ private boolean isInactive(DatanodeInfo datanode) {
|
||||
(avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
|
||||
}
|
||||
|
||||
private boolean isSlowNode(String dnUuid) {
|
||||
return avoidSlowDataNodesForRead && slowNodesUuidSet.contains(dnUuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort the non-striped located blocks by the distance to the target host.
|
||||
*
|
||||
* For striped blocks, it will only move decommissioned/stale nodes to the
|
||||
* bottom. For example, assume we have storage list:
|
||||
* For striped blocks, it will only move decommissioned/stale/slow
|
||||
* nodes to the bottom. For example, assume we have storage list:
|
||||
* d0, d1, d2, d3, d4, d5, d6, d7, d8, d9
|
||||
* mapping to block indices:
|
||||
* 0, 1, 2, 3, 4, 5, 6, 7, 8, 2
|
||||
@ -526,8 +528,11 @@ private boolean isInactive(DatanodeInfo datanode) {
|
||||
*/
|
||||
public void sortLocatedBlocks(final String targetHost,
|
||||
final List<LocatedBlock> locatedBlocks) {
|
||||
Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
|
||||
new DFSUtil.ServiceAndStaleComparator(staleInterval) :
|
||||
Comparator<DatanodeInfo> comparator =
|
||||
avoidStaleDataNodesForRead || avoidSlowDataNodesForRead ?
|
||||
new DFSUtil.StaleAndSlowComparator(
|
||||
avoidStaleDataNodesForRead, staleInterval,
|
||||
avoidSlowDataNodesForRead, slowNodesUuidSet) :
|
||||
new DFSUtil.ServiceComparator();
|
||||
// sort located block
|
||||
for (LocatedBlock lb : locatedBlocks) {
|
||||
@ -540,7 +545,8 @@ public void sortLocatedBlocks(final String targetHost,
|
||||
}
|
||||
|
||||
/**
|
||||
* Move decommissioned/stale datanodes to the bottom. After sorting it will
|
||||
* Move decommissioned/entering_maintenance/stale/slow
|
||||
* datanodes to the bottom. After sorting it will
|
||||
* update block indices and block tokens respectively.
|
||||
*
|
||||
* @param lb located striped block
|
||||
@ -571,8 +577,9 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
|
||||
}
|
||||
|
||||
/**
|
||||
* Move decommissioned/entering_maintenance/stale datanodes to the bottom.
|
||||
* Also, sort nodes by network distance.
|
||||
* Move decommissioned/entering_maintenance/stale/slow
|
||||
* datanodes to the bottom. Also, sort nodes by network
|
||||
* distance.
|
||||
*
|
||||
* @param lb located block
|
||||
* @param targetHost target host
|
||||
@ -602,12 +609,15 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
|
||||
}
|
||||
|
||||
DatanodeInfoWithStorage[] di = lb.getLocations();
|
||||
// Move decommissioned/entering_maintenance/stale datanodes to the bottom
|
||||
// Move decommissioned/entering_maintenance/stale/slow
|
||||
// datanodes to the bottom
|
||||
Arrays.sort(di, comparator);
|
||||
|
||||
// Sort nodes by network distance only for located blocks
|
||||
int lastActiveIndex = di.length - 1;
|
||||
while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) {
|
||||
while (lastActiveIndex > 0 && (
|
||||
isSlowNode(di[lastActiveIndex].getDatanodeUuid()) ||
|
||||
isInactive(di[lastActiveIndex]))) {
|
||||
--lastActiveIndex;
|
||||
}
|
||||
int activeLen = lastActiveIndex + 1;
|
||||
@ -2083,10 +2093,10 @@ public String getSlowPeersReport() {
|
||||
* Returns all tracking slow peers.
|
||||
* @return
|
||||
*/
|
||||
public Set<Node> getSlowPeers() {
|
||||
Set<Node> slowPeersSet = Sets.newConcurrentHashSet();
|
||||
public Set<String> getSlowPeersUuidSet() {
|
||||
Set<String> slowPeersUuidSet = Sets.newConcurrentHashSet();
|
||||
if (slowPeerTracker == null) {
|
||||
return slowPeersSet;
|
||||
return slowPeersUuidSet;
|
||||
}
|
||||
ArrayList<String> slowNodes =
|
||||
slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
|
||||
@ -2099,18 +2109,18 @@ public Set<Node> getSlowPeers() {
|
||||
DatanodeDescriptor datanodeByHost =
|
||||
host2DatanodeMap.getDatanodeByHost(ipAddr);
|
||||
if (datanodeByHost != null) {
|
||||
slowPeersSet.add(datanodeByHost);
|
||||
slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid());
|
||||
}
|
||||
}
|
||||
return slowPeersSet;
|
||||
return slowPeersUuidSet;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all tracking slow peers.
|
||||
* Returns all tracking slow datanodes uuids.
|
||||
* @return
|
||||
*/
|
||||
public static Set<Node> getSlowNodes() {
|
||||
return slowNodesSet;
|
||||
public static Set<String> getSlowNodesUuidSet() {
|
||||
return slowNodesUuidSet;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2128,6 +2138,12 @@ public SlowPeerTracker getSlowPeerTracker() {
|
||||
public SlowDiskTracker getSlowDiskTracker() {
|
||||
return slowDiskTracker;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void addSlowPeers(String dnUuid) {
|
||||
slowNodesUuidSet.add(dnUuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve information about slow disks as a JSON.
|
||||
* Returns null if we are not tracking slow disks.
|
||||
|
@ -2110,6 +2110,16 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.avoid.read.slow.datanode</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Indicate whether or not to avoid reading from "slow" datanodes.
|
||||
Slow datanodes will be moved to the end of the node list returned
|
||||
for reading.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.avoid.write.stale.datanode</name>
|
||||
<value>false</value>
|
||||
|
@ -22,7 +22,6 @@
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
@ -100,12 +99,12 @@ public void testChooseTargetExcludeSlowNodes() throws Exception {
|
||||
Thread.sleep(3000);
|
||||
|
||||
// fetch slow nodes
|
||||
Set<Node> slowPeers = dnManager.getSlowPeers();
|
||||
Set<String> slowPeers = dnManager.getSlowPeersUuidSet();
|
||||
|
||||
// assert slow nodes
|
||||
assertEquals(3, slowPeers.size());
|
||||
for (int i = 0; i < slowPeers.size(); i++) {
|
||||
assertTrue(slowPeers.contains(dataNodes[i]));
|
||||
assertTrue(slowPeers.contains(dataNodes[i].getDatanodeUuid()));
|
||||
}
|
||||
|
||||
// mock writer
|
||||
@ -120,7 +119,8 @@ public void testChooseTargetExcludeSlowNodes() throws Exception {
|
||||
// assert targets
|
||||
assertEquals(3, targets.length);
|
||||
for (int i = 0; i < targets.length; i++) {
|
||||
assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor()));
|
||||
assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor()
|
||||
.getDatanodeUuid()));
|
||||
}
|
||||
} finally {
|
||||
namenode.getNamesystem().writeUnlock();
|
||||
|
@ -27,34 +27,24 @@
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* This class tests the sorting of located blocks based on
|
||||
* multiple states.
|
||||
*/
|
||||
public class TestSortLocatedBlock {
|
||||
static final Logger LOG = LoggerFactory
|
||||
.getLogger(TestSortLocatedBlock.class);
|
||||
|
||||
private static DatanodeManager dm;
|
||||
private static final long STALE_INTERVAL = 30 * 1000 * 60;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws IOException {
|
||||
dm = mockDatanodeManager();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify sorting with multiple state
|
||||
* datanodes exists in storage lists.
|
||||
@ -73,8 +63,7 @@ public static void setup() throws IOException {
|
||||
* (d4 -> d3 -> d1 -> d2 -> d0).
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testWithMultipleStateDatanodes() {
|
||||
LOG.info("Starting test testWithMultipleStateDatanodes");
|
||||
public void testWithStaleDatanodes() throws IOException {
|
||||
long blockID = Long.MIN_VALUE;
|
||||
int totalDns = 5;
|
||||
DatanodeInfo[] locs = new DatanodeInfo[totalDns];
|
||||
@ -106,6 +95,7 @@ public void testWithMultipleStateDatanodes() {
|
||||
1024L, new Date().getTime()), locs));
|
||||
|
||||
// sort located blocks
|
||||
DatanodeManager dm = mockDatanodeManager(true, false);
|
||||
dm.sortLocatedBlocks(null, locatedBlocks);
|
||||
|
||||
// get locations after sorting
|
||||
@ -114,6 +104,9 @@ public void testWithMultipleStateDatanodes() {
|
||||
|
||||
// assert location order:
|
||||
// live -> stale -> entering_maintenance -> decommissioned
|
||||
// (d4 -> d3 -> d1 -> d0 -> d2)
|
||||
// or
|
||||
// (d4 -> d3 -> d1 -> d2 -> d0).
|
||||
// live
|
||||
assertEquals(locs[4].getIpAddr(), locations[0].getIpAddr());
|
||||
// stale
|
||||
@ -126,11 +119,188 @@ public void testWithMultipleStateDatanodes() {
|
||||
&& decommissionedNodes.contains(locations[4]));
|
||||
}
|
||||
|
||||
private static DatanodeManager mockDatanodeManager() throws IOException {
|
||||
/**
|
||||
* Test to verify sorting with multiple state
|
||||
* datanodes exists in storage lists.
|
||||
*
|
||||
* After sorting the expected datanodes list will be:
|
||||
* live -> slow -> stale -> staleAndSlow ->
|
||||
* entering_maintenance -> decommissioned.
|
||||
*
|
||||
* avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=true
|
||||
* d5 -> d4 -> d3 -> d2 -> d1 -> d0
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testAviodStaleAndSlowDatanodes() throws IOException {
|
||||
DatanodeManager dm = mockDatanodeManager(true, true);
|
||||
DatanodeInfo[] locs = mockDatanodes(dm);
|
||||
|
||||
ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
|
||||
locatedBlocks.add(new LocatedBlock(
|
||||
new ExtendedBlock("pool", Long.MIN_VALUE,
|
||||
1024L, new Date().getTime()), locs));
|
||||
|
||||
// sort located blocks
|
||||
dm.sortLocatedBlocks(null, locatedBlocks);
|
||||
|
||||
// get locations after sorting
|
||||
LocatedBlock locatedBlock = locatedBlocks.get(0);
|
||||
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();
|
||||
|
||||
// assert location order:
|
||||
// live -> stale -> entering_maintenance -> decommissioned
|
||||
// live
|
||||
assertEquals(locs[5].getIpAddr(), locations[0].getIpAddr());
|
||||
// slow
|
||||
assertEquals(locs[4].getIpAddr(), locations[1].getIpAddr());
|
||||
// stale
|
||||
assertEquals(locs[3].getIpAddr(), locations[2].getIpAddr());
|
||||
// stale and slow
|
||||
assertEquals(locs[2].getIpAddr(), locations[3].getIpAddr());
|
||||
// entering_maintenance
|
||||
assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr());
|
||||
// decommissioned
|
||||
assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify sorting with multiple state
|
||||
* datanodes exists in storage lists.
|
||||
*
|
||||
* After sorting the expected datanodes list will be:
|
||||
* (live <-> slow) -> (stale <-> staleAndSlow) ->
|
||||
* entering_maintenance -> decommissioned.
|
||||
*
|
||||
* avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=false
|
||||
* (d5 <-> d4) -> (d3 <-> d2) -> d1 -> d0
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testAviodStaleDatanodes() throws IOException {
|
||||
DatanodeManager dm = mockDatanodeManager(true, false);
|
||||
DatanodeInfo[] locs = mockDatanodes(dm);
|
||||
|
||||
ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
|
||||
locatedBlocks.add(new LocatedBlock(
|
||||
new ExtendedBlock("pool", Long.MIN_VALUE,
|
||||
1024L, new Date().getTime()), locs));
|
||||
|
||||
// sort located blocks
|
||||
dm.sortLocatedBlocks(null, locatedBlocks);
|
||||
|
||||
// get locations after sorting
|
||||
LocatedBlock locatedBlock = locatedBlocks.get(0);
|
||||
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();
|
||||
|
||||
// assert location order:
|
||||
// live -> stale -> entering_maintenance -> decommissioned
|
||||
// live
|
||||
assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() &&
|
||||
locs[4].getIpAddr() == locations[1].getIpAddr()) ||
|
||||
(locs[5].getIpAddr() == locations[1].getIpAddr() &&
|
||||
locs[4].getIpAddr() == locations[0].getIpAddr()));
|
||||
// stale
|
||||
assertTrue((locs[3].getIpAddr() == locations[2].getIpAddr() &&
|
||||
locs[2].getIpAddr() == locations[3].getIpAddr()) ||
|
||||
(locs[3].getIpAddr() == locations[3].getIpAddr() &&
|
||||
locs[2].getIpAddr() == locations[2].getIpAddr()));
|
||||
// entering_maintenance
|
||||
assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr());
|
||||
// decommissioned
|
||||
assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify sorting with multiple state
|
||||
* datanodes exists in storage lists.
|
||||
*
|
||||
* After sorting the expected datanodes list will be:
|
||||
* (live <-> stale) -> (slow <-> staleAndSlow) ->
|
||||
* entering_maintenance -> decommissioned.
|
||||
*
|
||||
* avoidStaleDataNodesForRead=false && avoidSlowDataNodesForRead=true
|
||||
* (d5 -> d3) -> (d4 <-> d2) -> d1 -> d0
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testAviodSlowDatanodes() throws IOException {
|
||||
DatanodeManager dm = mockDatanodeManager(false, true);
|
||||
DatanodeInfo[] locs = mockDatanodes(dm);
|
||||
|
||||
ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
|
||||
locatedBlocks.add(new LocatedBlock(
|
||||
new ExtendedBlock("pool", Long.MIN_VALUE,
|
||||
1024L, new Date().getTime()), locs));
|
||||
|
||||
// sort located blocks
|
||||
dm.sortLocatedBlocks(null, locatedBlocks);
|
||||
|
||||
// get locations after sorting
|
||||
LocatedBlock locatedBlock = locatedBlocks.get(0);
|
||||
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();
|
||||
|
||||
// assert location order:
|
||||
// live -> slow -> entering_maintenance -> decommissioned
|
||||
// live
|
||||
assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() &&
|
||||
locs[3].getIpAddr() == locations[1].getIpAddr()) ||
|
||||
(locs[5].getIpAddr() == locations[1].getIpAddr() &&
|
||||
locs[3].getIpAddr() == locations[0].getIpAddr()));
|
||||
// slow
|
||||
assertTrue((locs[4].getIpAddr() == locations[2].getIpAddr() &&
|
||||
locs[2].getIpAddr() == locations[3].getIpAddr()) ||
|
||||
(locs[4].getIpAddr() == locations[3].getIpAddr() &&
|
||||
locs[2].getIpAddr() == locations[2].getIpAddr()));
|
||||
// entering_maintenance
|
||||
assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr());
|
||||
// decommissioned
|
||||
assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr());
|
||||
}
|
||||
|
||||
/**
|
||||
* We mock the following list of datanodes, and create LocatedBlock.
|
||||
* d0 - decommissioned
|
||||
* d1 - entering_maintenance
|
||||
* d2 - stale and slow
|
||||
* d3 - stale
|
||||
* d4 - slow
|
||||
* d5 - live(in-service)
|
||||
*/
|
||||
private static DatanodeInfo[] mockDatanodes(DatanodeManager dm) {
|
||||
int totalDns = 6;
|
||||
DatanodeInfo[] locs = new DatanodeInfo[totalDns];
|
||||
|
||||
// create datanodes
|
||||
for (int i = 0; i < totalDns; i++) {
|
||||
String ip = i + "." + i + "." + i + "." + i;
|
||||
locs[i] = DFSTestUtil.getDatanodeInfo(ip);
|
||||
locs[i].setLastUpdateMonotonic(Time.monotonicNow());
|
||||
}
|
||||
// set decommissioned state
|
||||
locs[0].setDecommissioned();
|
||||
// set entering_maintenance state
|
||||
locs[1].startMaintenance();
|
||||
// set stale and slow state
|
||||
locs[2].setLastUpdateMonotonic(Time.monotonicNow() -
|
||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1);
|
||||
dm.addSlowPeers(locs[2].getDatanodeUuid());
|
||||
// set stale state
|
||||
locs[3].setLastUpdateMonotonic(Time.monotonicNow() -
|
||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1);
|
||||
// set slow state
|
||||
dm.addSlowPeers(locs[4].getDatanodeUuid());
|
||||
|
||||
return locs;
|
||||
}
|
||||
|
||||
private static DatanodeManager mockDatanodeManager(
|
||||
boolean avoidStaleDNForRead, boolean avoidSlowDNForRead)
|
||||
throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
|
||||
true);
|
||||
avoidStaleDNForRead);
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
|
||||
avoidSlowDNForRead);
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||
STALE_INTERVAL);
|
||||
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
||||
|
Loading…
Reference in New Issue
Block a user