diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index 3715e3cdfb..5ded4b4d25 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -39,6 +39,7 @@ import java.net.ConnectException; import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -46,6 +47,8 @@ import java.util.concurrent.ConcurrentHashMap; import javax.net.SocketFactory; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.thirdparty.com.google.common.cache.Cache; +import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.commons.net.util.SubnetUtils; import org.apache.commons.net.util.SubnetUtils.SubnetInfo; import org.apache.hadoop.classification.InterfaceAudience; @@ -178,11 +181,33 @@ public class NetUtils { * include a port number * @param configName the name of the configuration from which * target was loaded. This is used in the - * exception message in the case that parsing fails. + * exception message in the case that parsing fails. */ public static InetSocketAddress createSocketAddr(String target, int defaultPort, String configName) { + return createSocketAddr(target, defaultPort, configName, false); + } + + /** + * Create an InetSocketAddress from the given target string and + * default port. If the string cannot be parsed correctly, the + * configName parameter is used as part of the + * exception message, allowing the user to better diagnose + * the misconfiguration. + * + * @param target a string of either "host" or "host:port" + * @param defaultPort the default port if target does not + * include a port number + * @param configName the name of the configuration from which + * target was loaded. This is used in the + * exception message in the case that parsing fails. + * @param useCacheIfPresent Whether use cache when create URI + */ + public static InetSocketAddress createSocketAddr(String target, + int defaultPort, + String configName, + boolean useCacheIfPresent) { String helpText = ""; if (configName != null) { helpText = " (configuration property '" + configName + "')"; @@ -192,15 +217,8 @@ public class NetUtils { helpText); } target = target.trim(); - boolean hasScheme = target.contains("://"); - URI uri = null; - try { - uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"+target); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - "Does not contain a valid host:port authority: " + target + helpText - ); - } + boolean hasScheme = target.contains("://"); + URI uri = createURI(target, hasScheme, helpText, useCacheIfPresent); String host = uri.getHost(); int port = uri.getPort(); @@ -208,10 +226,9 @@ public class NetUtils { port = defaultPort; } String path = uri.getPath(); - + if ((host == null) || (port < 0) || - (!hasScheme && path != null && !path.isEmpty())) - { + (!hasScheme && path != null && !path.isEmpty())) { throw new IllegalArgumentException( "Does not contain a valid host:port authority: " + target + helpText ); @@ -219,6 +236,40 @@ public class NetUtils { return createSocketAddrForHost(host, port); } + private static final long URI_CACHE_SIZE_DEFAULT = 1000; + private static final long URI_CACHE_EXPIRE_TIME_DEFAULT = 12; + private static final Cache URI_CACHE = CacheBuilder.newBuilder() + .maximumSize(URI_CACHE_SIZE_DEFAULT) + .expireAfterWrite(URI_CACHE_EXPIRE_TIME_DEFAULT, TimeUnit.HOURS) + .build(); + + private static URI createURI(String target, + boolean hasScheme, + String helpText, + boolean useCacheIfPresent) { + URI uri; + if (useCacheIfPresent) { + uri = URI_CACHE.getIfPresent(target); + if (uri != null) { + return uri; + } + } + + try { + uri = hasScheme ? URI.create(target) : + URI.create("dummyscheme://" + target); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Does not contain a valid host:port authority: " + target + helpText + ); + } + + if (useCacheIfPresent) { + URI_CACHE.put(target, uri); + } + return uri; + } + /** * Create a socket address with the given host and port. The hostname * might be replaced with another host that was set via diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java index 76284932c4..cfffd85186 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java @@ -352,7 +352,7 @@ public class TestNetUtils { assertEquals(1000, addr.getPort()); try { - addr = NetUtils.createSocketAddr( + NetUtils.createSocketAddr( "127.0.0.1:blahblah", 1000, "myconfig"); fail("Should have failed to parse bad port"); } catch (IllegalArgumentException iae) { @@ -360,6 +360,49 @@ public class TestNetUtils { } } + @Test + public void testCreateSocketAddressWithURICache() throws Throwable { + InetSocketAddress addr = NetUtils.createSocketAddr( + "127.0.0.1:12345", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(12345, addr.getPort()); + + addr = NetUtils.createSocketAddr( + "127.0.0.1:12345", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(12345, addr.getPort()); + + // ---------------------------------------------------- + + addr = NetUtils.createSocketAddr( + "127.0.0.1", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(1000, addr.getPort()); + + addr = NetUtils.createSocketAddr( + "127.0.0.1", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(1000, addr.getPort()); + + // ---------------------------------------------------- + + try { + NetUtils.createSocketAddr( + "127.0.0.1:blahblah", 1000, "myconfig", true); + fail("Should have failed to parse bad port"); + } catch (IllegalArgumentException iae) { + assertInException(iae, "myconfig"); + } + + try { + NetUtils.createSocketAddr( + "127.0.0.1:blahblah", 1000, "myconfig", true); + fail("Should have failed to parse bad port"); + } catch (IllegalArgumentException iae) { + assertInException(iae, "myconfig"); + } + } + private void assertRemoteDetailsIncluded(IOException wrapped) throws Throwable { assertInException(wrapped, "desthost"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java index 9dcc4ca3fc..b6b9684445 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java @@ -370,6 +370,16 @@ public class TestSecurityUtil { verifyServiceAddr(staticHost, "255.255.255.255"); } + @Test + public void testSocketAddrWithChangeIP() { + String staticHost = "host4"; + NetUtils.addStaticResolution(staticHost, "255.255.255.255"); + verifyServiceAddr(staticHost, "255.255.255.255"); + + NetUtils.addStaticResolution(staticHost, "255.255.255.254"); + verifyServiceAddr(staticHost, "255.255.255.254"); + } + // this is a bizarre case, but it's if a test tries to remap an ip address @Test public void testSocketAddrWithIPToStaticIP() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index a918101f0f..4e21133d7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1089,7 +1089,9 @@ public class DFSInputStream extends FSInputStream final String dnAddr = chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); DFSClient.LOG.debug("Connecting to datanode {}", dnAddr); - InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); + boolean uriCacheEnabled = dfsClient.getConf().isUriCacheEnabled(); + InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr, + -1, null, uriCacheEnabled); return new DNAddrPair(chosenNode, targetAddr, storageType, block); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index aef31023d5..2e60c17daf 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -410,6 +410,9 @@ public interface HdfsClientConfigKeys { String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size"; + String URI_CACHE_KEY = PREFIX + "uri.cache.enabled"; + boolean URI_CACHE_DEFAULT = false; + interface ShortCircuit { String PREFIX = Read.PREFIX + "shortcircuit."; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 7105322cd5..facbe70589 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -129,6 +129,7 @@ public class DfsClientConf { private final int blockWriteLocateFollowingMaxDelayMs; private final long defaultBlockSize; private final long prefetchSize; + private final boolean uriCacheEnabled; private final short defaultReplication; private final String taskId; private final FsPermission uMask; @@ -211,24 +212,7 @@ public class DfsClientConf { Write.MAX_PACKETS_IN_FLIGHT_KEY, Write.MAX_PACKETS_IN_FLIGHT_DEFAULT); - final boolean byteArrayManagerEnabled = conf.getBoolean( - Write.ByteArrayManager.ENABLED_KEY, - Write.ByteArrayManager.ENABLED_DEFAULT); - if (!byteArrayManagerEnabled) { - writeByteArrayManagerConf = null; - } else { - final int countThreshold = conf.getInt( - Write.ByteArrayManager.COUNT_THRESHOLD_KEY, - Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT); - final int countLimit = conf.getInt( - Write.ByteArrayManager.COUNT_LIMIT_KEY, - Write.ByteArrayManager.COUNT_LIMIT_DEFAULT); - final long countResetTimePeriodMs = conf.getLong( - Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY, - Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT); - writeByteArrayManagerConf = new ByteArrayManager.Conf( - countThreshold, countLimit, countResetTimePeriodMs); - } + writeByteArrayManagerConf = loadWriteByteArrayManagerConf(conf); defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); @@ -240,6 +224,10 @@ public class DfsClientConf { Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY, 10 * defaultBlockSize); + + uriCacheEnabled = conf.getBoolean(Read.URI_CACHE_KEY, + Read.URI_CACHE_DEFAULT); + numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); numBlockWriteRetry = conf.getInt( @@ -308,6 +296,27 @@ public class DfsClientConf { "can't be more then 5."); } + private ByteArrayManager.Conf loadWriteByteArrayManagerConf( + Configuration conf) { + final boolean byteArrayManagerEnabled = conf.getBoolean( + Write.ByteArrayManager.ENABLED_KEY, + Write.ByteArrayManager.ENABLED_DEFAULT); + if (!byteArrayManagerEnabled) { + return null; + } + final int countThreshold = conf.getInt( + Write.ByteArrayManager.COUNT_THRESHOLD_KEY, + Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT); + final int countLimit = conf.getInt( + Write.ByteArrayManager.COUNT_LIMIT_KEY, + Write.ByteArrayManager.COUNT_LIMIT_DEFAULT); + final long countResetTimePeriodMs = conf.getLong( + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY, + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT); + return new ByteArrayManager.Conf( + countThreshold, countLimit, countResetTimePeriodMs); + } + @SuppressWarnings("unchecked") private List> loadReplicaAccessorBuilderClasses(Configuration conf) { @@ -555,6 +564,13 @@ public class DfsClientConf { return prefetchSize; } + /** + * @return the uriCacheEnable + */ + public boolean isUriCacheEnabled() { + return uriCacheEnabled; + } + /** * @return the defaultReplication */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ad62dd4e7f..d2cb3c7aa6 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4173,6 +4173,15 @@ + + dfs.client.read.uri.cache.enabled + false + + If true, dfs client will use cache when creating URI based on host:port + to reduce the frequency of URI object creation. + + + dfs.client.read.short.circuit.replica.stale.threshold.ms 1800000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index 5cfd8f6c97..c4a1e4aa2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -44,6 +44,7 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { HdfsClientConfigKeys.Failover.class, HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class, HdfsClientConfigKeys.BlockWrite.class, + HdfsClientConfigKeys.Read.class, HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class }; // Set error modes