diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index af9891a52f..0676cf91a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1054,10 +1054,21 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, StorageType[] storageTypes = block.getStorageTypes(); DatanodeInfo chosenNode = null; StorageType storageType = null; - if (nodes != null) { + if (dfsClient.getConf().isReadUseCachePriority()) { + DatanodeInfo[] cachedLocs = block.getCachedLocations(); + if (cachedLocs != null) { + for (int i = 0; i < cachedLocs.length; i++) { + if (isValidNode(cachedLocs[i], ignoredNodes)) { + chosenNode = cachedLocs[i]; + break; + } + } + } + } + + if (chosenNode == null && nodes != null) { for (int i = 0; i < nodes.length; i++) { - if (!dfsClient.getDeadNodes(this).containsKey(nodes[i]) - && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { + if (isValidNode(nodes[i], ignoredNodes)) { chosenNode = nodes[i]; // Storage types are ordered to correspond with nodes, so use the same // index to get storage type. @@ -1090,6 +1101,15 @@ protected void reportLostBlock(LocatedBlock lostBlock, ", ignoredNodes = " + ignoredNodes); } + private boolean isValidNode(DatanodeInfo node, + Collection ignoredNodes) { + if (!dfsClient.getDeadNodes(this).containsKey(node) + && (ignoredNodes == null || !ignoredNodes.contains(node))) { + return true; + } + return false; + } + private static String getBestNodeDNAddrPairErrorString( DatanodeInfo nodes[], AbstractMap deadNodes, Collection ignoredNodes) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 407462c6e7..efc2766e9e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -108,6 +108,9 @@ public interface HdfsClientConfigKeys { String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = "dfs.client.use.legacy.blockreader.local"; boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false; + String DFS_CLIENT_READ_USE_CACHE_PRIORITY = + "dfs.client.read.use.cache.priority"; + boolean DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT = false; String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout"; long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 07f0eee8dd..918fef7e50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -60,6 +60,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; @@ -150,6 +152,8 @@ public class DfsClientConf { private final boolean dataTransferTcpNoDelay; + private final boolean readUseCachePriority; + private final boolean deadNodeDetectionEnabled; private final long leaseHardLimitPeriod; @@ -260,6 +264,8 @@ public DfsClientConf(Configuration conf) { slowIoWarningThresholdMs = conf.getLong( DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + readUseCachePriority = conf.getBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, + DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT); refreshReadBlockLocationsMS = conf.getLong( HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, @@ -630,6 +636,13 @@ public long getleaseHardLimitPeriod() { return leaseHardLimitPeriod; } + /** + * @return the readUseCachePriority + */ + public boolean isReadUseCachePriority() { + return readUseCachePriority; + } + /** * @return the replicaAccessorBuilderClasses */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 29f1b6da6b..c1b6a544b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -268,6 +268,7 @@ public String toString() { + "; corrupt=" + corrupt + "; offset=" + offset + "; locs=" + Arrays.asList(locs) + + "; cachedLocs=" + Arrays.asList(cachedLocs) + "}"; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 913e47b75f..bf18f3a0be 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2978,6 +2978,15 @@ + + dfs.client.read.use.cache.priority + false + + If true, the cached replica of the datanode is preferred + else the replica closest to client is preferred. + + + dfs.block.local-path-access.user diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java index 0d322daebb..bdc342ad3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -33,7 +36,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; @@ -219,4 +224,62 @@ public void testNullCheckSumWhenDNRestarted() cluster.shutdown(); } } + + @Test + public void testReadWithPreferredCachingReplica() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, true); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + DistributedFileSystem fs = null; + Path filePath = new Path("/testReadPreferredCachingReplica"); + try { + fs = cluster.getFileSystem(); + FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); + DFSInputStream dfsInputStream = + (DFSInputStream) fs.open(filePath).getWrappedStream(); + LocatedBlock lb = mock(LocatedBlock.class); + when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]); + DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111, + 1112, 1113, 1114); + DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId); + when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[] {dnInfo}); + DatanodeInfo retDNInfo = + dfsInputStream.getBestNodeDNAddrPair(lb, null).info; + assertEquals(dnInfo, retDNInfo); + } finally { + fs.delete(filePath, true); + cluster.shutdown(); + } + } + + @Test + public void testReadWithoutPreferredCachingReplica() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, false); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + DistributedFileSystem fs = null; + Path filePath = new Path("/testReadWithoutPreferredCachingReplica"); + try { + fs = cluster.getFileSystem(); + FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); + DFSInputStream dfsInputStream = + (DFSInputStream) fs.open(filePath).getWrappedStream(); + LocatedBlock lb = mock(LocatedBlock.class); + when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]); + DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111, + 1112, 1113, 1114); + DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId); + when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo}); + DatanodeInfo retDNInfo = + dfsInputStream.getBestNodeDNAddrPair(lb, null).info; + assertEquals(dnInfo, retDNInfo); + } finally { + fs.delete(filePath, true); + cluster.shutdown(); + } + } }