HDFS-14283. DFSInputStream to prefer cached replica. Contributed by Lisheng Sun.

This commit is contained in:
Ayush Saxena 2020-05-06 16:55:04 +05:30
parent 78c97907cb
commit 4e0d99c4d6
6 changed files with 112 additions and 3 deletions

View File

@ -1054,10 +1054,21 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
StorageType[] storageTypes = block.getStorageTypes(); StorageType[] storageTypes = block.getStorageTypes();
DatanodeInfo chosenNode = null; DatanodeInfo chosenNode = null;
StorageType storageType = null; StorageType storageType = null;
if (nodes != null) { if (dfsClient.getConf().isReadUseCachePriority()) {
DatanodeInfo[] cachedLocs = block.getCachedLocations();
if (cachedLocs != null) {
for (int i = 0; i < cachedLocs.length; i++) {
if (isValidNode(cachedLocs[i], ignoredNodes)) {
chosenNode = cachedLocs[i];
break;
}
}
}
}
if (chosenNode == null && nodes != null) {
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
if (!dfsClient.getDeadNodes(this).containsKey(nodes[i]) if (isValidNode(nodes[i], ignoredNodes)) {
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
chosenNode = nodes[i]; chosenNode = nodes[i];
// Storage types are ordered to correspond with nodes, so use the same // Storage types are ordered to correspond with nodes, so use the same
// index to get storage type. // index to get storage type.
@ -1090,6 +1101,15 @@ protected void reportLostBlock(LocatedBlock lostBlock,
", ignoredNodes = " + ignoredNodes); ", ignoredNodes = " + ignoredNodes);
} }
private boolean isValidNode(DatanodeInfo node,
Collection<DatanodeInfo> ignoredNodes) {
if (!dfsClient.getDeadNodes(this).containsKey(node)
&& (ignoredNodes == null || !ignoredNodes.contains(node))) {
return true;
}
return false;
}
private static String getBestNodeDNAddrPairErrorString( private static String getBestNodeDNAddrPairErrorString(
DatanodeInfo nodes[], AbstractMap<DatanodeInfo, DatanodeInfo nodes[], AbstractMap<DatanodeInfo,
DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) { DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) {

View File

@ -108,6 +108,9 @@ public interface HdfsClientConfigKeys {
String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL =
"dfs.client.use.legacy.blockreader.local"; "dfs.client.use.legacy.blockreader.local";
boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false; boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false;
String DFS_CLIENT_READ_USE_CACHE_PRIORITY =
"dfs.client.read.use.cache.priority";
boolean DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT = false;
String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY =
"dfs.client.datanode-restart.timeout"; "dfs.client.datanode-restart.timeout";
long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30; long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;

View File

@ -60,6 +60,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
@ -150,6 +152,8 @@ public class DfsClientConf {
private final boolean dataTransferTcpNoDelay; private final boolean dataTransferTcpNoDelay;
private final boolean readUseCachePriority;
private final boolean deadNodeDetectionEnabled; private final boolean deadNodeDetectionEnabled;
private final long leaseHardLimitPeriod; private final long leaseHardLimitPeriod;
@ -260,6 +264,8 @@ public DfsClientConf(Configuration conf) {
slowIoWarningThresholdMs = conf.getLong( slowIoWarningThresholdMs = conf.getLong(
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
readUseCachePriority = conf.getBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY,
DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT);
refreshReadBlockLocationsMS = conf.getLong( refreshReadBlockLocationsMS = conf.getLong(
HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
@ -630,6 +636,13 @@ public long getleaseHardLimitPeriod() {
return leaseHardLimitPeriod; return leaseHardLimitPeriod;
} }
/**
* @return the readUseCachePriority
*/
public boolean isReadUseCachePriority() {
return readUseCachePriority;
}
/** /**
* @return the replicaAccessorBuilderClasses * @return the replicaAccessorBuilderClasses
*/ */

View File

@ -268,6 +268,7 @@ public String toString() {
+ "; corrupt=" + corrupt + "; corrupt=" + corrupt
+ "; offset=" + offset + "; offset=" + offset
+ "; locs=" + Arrays.asList(locs) + "; locs=" + Arrays.asList(locs)
+ "; cachedLocs=" + Arrays.asList(cachedLocs)
+ "}"; + "}";
} }

View File

@ -2978,6 +2978,15 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.read.use.cache.priority</name>
<value>false</value>
<description>
If true, the cached replica of the datanode is preferred
else the replica closest to client is preferred.
</description>
</property>
<property> <property>
<name>dfs.block.local-path-access.user</name> <name>dfs.block.local-path-access.user</name>
<value></value> <value></value>

View File

@ -17,11 +17,14 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -33,7 +36,9 @@
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@ -219,4 +224,62 @@ public void testNullCheckSumWhenDNRestarted()
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test
public void testReadWithPreferredCachingReplica() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, true);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
DistributedFileSystem fs = null;
Path filePath = new Path("/testReadPreferredCachingReplica");
try {
fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
DFSInputStream dfsInputStream =
(DFSInputStream) fs.open(filePath).getWrappedStream();
LocatedBlock lb = mock(LocatedBlock.class);
when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]);
DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111,
1112, 1113, 1114);
DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
DatanodeInfo retDNInfo =
dfsInputStream.getBestNodeDNAddrPair(lb, null).info;
assertEquals(dnInfo, retDNInfo);
} finally {
fs.delete(filePath, true);
cluster.shutdown();
}
}
@Test
public void testReadWithoutPreferredCachingReplica() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, false);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
DistributedFileSystem fs = null;
Path filePath = new Path("/testReadWithoutPreferredCachingReplica");
try {
fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
DFSInputStream dfsInputStream =
(DFSInputStream) fs.open(filePath).getWrappedStream();
LocatedBlock lb = mock(LocatedBlock.class);
when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]);
DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111,
1112, 1113, 1114);
DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
DatanodeInfo retDNInfo =
dfsInputStream.getBestNodeDNAddrPair(lb, null).info;
assertEquals(dnInfo, retDNInfo);
} finally {
fs.delete(filePath, true);
cluster.shutdown();
}
}
} }