HADOOP-17222. Create socket address leveraging URI cache (#2241)
Contributed by fanrui. Signed-off-by: Mingliang Liu <liuml07@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
12a316cdf9
commit
56ebabd426
@ -39,12 +39,16 @@ import java.net.ConnectException;
|
|||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.nio.channels.UnresolvedAddressException;
|
import java.nio.channels.UnresolvedAddressException;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
|
||||||
import org.apache.commons.net.util.SubnetUtils;
|
import org.apache.commons.net.util.SubnetUtils;
|
||||||
import org.apache.commons.net.util.SubnetUtils.SubnetInfo;
|
import org.apache.commons.net.util.SubnetUtils.SubnetInfo;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
@ -182,6 +186,28 @@ public class NetUtils {
|
|||||||
public static InetSocketAddress createSocketAddr(String target,
|
public static InetSocketAddress createSocketAddr(String target,
|
||||||
int defaultPort,
|
int defaultPort,
|
||||||
String configName) {
|
String configName) {
|
||||||
|
return createSocketAddr(target, defaultPort, configName, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an InetSocketAddress from the given target string and
|
||||||
|
* default port. If the string cannot be parsed correctly, the
|
||||||
|
* <code>configName</code> parameter is used as part of the
|
||||||
|
* exception message, allowing the user to better diagnose
|
||||||
|
* the misconfiguration.
|
||||||
|
*
|
||||||
|
* @param target a string of either "host" or "host:port"
|
||||||
|
* @param defaultPort the default port if <code>target</code> does not
|
||||||
|
* include a port number
|
||||||
|
* @param configName the name of the configuration from which
|
||||||
|
* <code>target</code> was loaded. This is used in the
|
||||||
|
* exception message in the case that parsing fails.
|
||||||
|
* @param useCacheIfPresent Whether use cache when create URI
|
||||||
|
*/
|
||||||
|
public static InetSocketAddress createSocketAddr(String target,
|
||||||
|
int defaultPort,
|
||||||
|
String configName,
|
||||||
|
boolean useCacheIfPresent) {
|
||||||
String helpText = "";
|
String helpText = "";
|
||||||
if (configName != null) {
|
if (configName != null) {
|
||||||
helpText = " (configuration property '" + configName + "')";
|
helpText = " (configuration property '" + configName + "')";
|
||||||
@ -192,14 +218,7 @@ public class NetUtils {
|
|||||||
}
|
}
|
||||||
target = target.trim();
|
target = target.trim();
|
||||||
boolean hasScheme = target.contains("://");
|
boolean hasScheme = target.contains("://");
|
||||||
URI uri = null;
|
URI uri = createURI(target, hasScheme, helpText, useCacheIfPresent);
|
||||||
try {
|
|
||||||
uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"+target);
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"Does not contain a valid host:port authority: " + target + helpText
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
String host = uri.getHost();
|
String host = uri.getHost();
|
||||||
int port = uri.getPort();
|
int port = uri.getPort();
|
||||||
@ -209,8 +228,7 @@ public class NetUtils {
|
|||||||
String path = uri.getPath();
|
String path = uri.getPath();
|
||||||
|
|
||||||
if ((host == null) || (port < 0) ||
|
if ((host == null) || (port < 0) ||
|
||||||
(!hasScheme && path != null && !path.isEmpty()))
|
(!hasScheme && path != null && !path.isEmpty())) {
|
||||||
{
|
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Does not contain a valid host:port authority: " + target + helpText
|
"Does not contain a valid host:port authority: " + target + helpText
|
||||||
);
|
);
|
||||||
@ -218,6 +236,40 @@ public class NetUtils {
|
|||||||
return createSocketAddrForHost(host, port);
|
return createSocketAddrForHost(host, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final long URI_CACHE_SIZE_DEFAULT = 1000;
|
||||||
|
private static final long URI_CACHE_EXPIRE_TIME_DEFAULT = 12;
|
||||||
|
private static final Cache<String, URI> URI_CACHE = CacheBuilder.newBuilder()
|
||||||
|
.maximumSize(URI_CACHE_SIZE_DEFAULT)
|
||||||
|
.expireAfterWrite(URI_CACHE_EXPIRE_TIME_DEFAULT, TimeUnit.HOURS)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private static URI createURI(String target,
|
||||||
|
boolean hasScheme,
|
||||||
|
String helpText,
|
||||||
|
boolean useCacheIfPresent) {
|
||||||
|
URI uri;
|
||||||
|
if (useCacheIfPresent) {
|
||||||
|
uri = URI_CACHE.getIfPresent(target);
|
||||||
|
if (uri != null) {
|
||||||
|
return uri;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
uri = hasScheme ? URI.create(target) :
|
||||||
|
URI.create("dummyscheme://" + target);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Does not contain a valid host:port authority: " + target + helpText
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (useCacheIfPresent) {
|
||||||
|
URI_CACHE.put(target, uri);
|
||||||
|
}
|
||||||
|
return uri;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a socket address with the given host and port. The hostname
|
* Create a socket address with the given host and port. The hostname
|
||||||
* might be replaced with another host that was set via
|
* might be replaced with another host that was set via
|
||||||
|
@ -352,7 +352,7 @@ public class TestNetUtils {
|
|||||||
assertEquals(1000, addr.getPort());
|
assertEquals(1000, addr.getPort());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
addr = NetUtils.createSocketAddr(
|
NetUtils.createSocketAddr(
|
||||||
"127.0.0.1:blahblah", 1000, "myconfig");
|
"127.0.0.1:blahblah", 1000, "myconfig");
|
||||||
fail("Should have failed to parse bad port");
|
fail("Should have failed to parse bad port");
|
||||||
} catch (IllegalArgumentException iae) {
|
} catch (IllegalArgumentException iae) {
|
||||||
@ -360,6 +360,49 @@ public class TestNetUtils {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateSocketAddressWithURICache() throws Throwable {
|
||||||
|
InetSocketAddress addr = NetUtils.createSocketAddr(
|
||||||
|
"127.0.0.1:12345", 1000, "myconfig", true);
|
||||||
|
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
|
||||||
|
assertEquals(12345, addr.getPort());
|
||||||
|
|
||||||
|
addr = NetUtils.createSocketAddr(
|
||||||
|
"127.0.0.1:12345", 1000, "myconfig", true);
|
||||||
|
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
|
||||||
|
assertEquals(12345, addr.getPort());
|
||||||
|
|
||||||
|
// ----------------------------------------------------
|
||||||
|
|
||||||
|
addr = NetUtils.createSocketAddr(
|
||||||
|
"127.0.0.1", 1000, "myconfig", true);
|
||||||
|
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
|
||||||
|
assertEquals(1000, addr.getPort());
|
||||||
|
|
||||||
|
addr = NetUtils.createSocketAddr(
|
||||||
|
"127.0.0.1", 1000, "myconfig", true);
|
||||||
|
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
|
||||||
|
assertEquals(1000, addr.getPort());
|
||||||
|
|
||||||
|
// ----------------------------------------------------
|
||||||
|
|
||||||
|
try {
|
||||||
|
NetUtils.createSocketAddr(
|
||||||
|
"127.0.0.1:blahblah", 1000, "myconfig", true);
|
||||||
|
fail("Should have failed to parse bad port");
|
||||||
|
} catch (IllegalArgumentException iae) {
|
||||||
|
assertInException(iae, "myconfig");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
NetUtils.createSocketAddr(
|
||||||
|
"127.0.0.1:blahblah", 1000, "myconfig", true);
|
||||||
|
fail("Should have failed to parse bad port");
|
||||||
|
} catch (IllegalArgumentException iae) {
|
||||||
|
assertInException(iae, "myconfig");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void assertRemoteDetailsIncluded(IOException wrapped)
|
private void assertRemoteDetailsIncluded(IOException wrapped)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
assertInException(wrapped, "desthost");
|
assertInException(wrapped, "desthost");
|
||||||
|
@ -370,6 +370,16 @@ public class TestSecurityUtil {
|
|||||||
verifyServiceAddr(staticHost, "255.255.255.255");
|
verifyServiceAddr(staticHost, "255.255.255.255");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSocketAddrWithChangeIP() {
|
||||||
|
String staticHost = "host4";
|
||||||
|
NetUtils.addStaticResolution(staticHost, "255.255.255.255");
|
||||||
|
verifyServiceAddr(staticHost, "255.255.255.255");
|
||||||
|
|
||||||
|
NetUtils.addStaticResolution(staticHost, "255.255.255.254");
|
||||||
|
verifyServiceAddr(staticHost, "255.255.255.254");
|
||||||
|
}
|
||||||
|
|
||||||
// this is a bizarre case, but it's if a test tries to remap an ip address
|
// this is a bizarre case, but it's if a test tries to remap an ip address
|
||||||
@Test
|
@Test
|
||||||
public void testSocketAddrWithIPToStaticIP() {
|
public void testSocketAddrWithIPToStaticIP() {
|
||||||
|
@ -1086,7 +1086,9 @@ public class DFSInputStream extends FSInputStream
|
|||||||
final String dnAddr =
|
final String dnAddr =
|
||||||
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
|
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
|
||||||
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
|
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
|
||||||
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
|
boolean uriCacheEnabled = dfsClient.getConf().isUriCacheEnabled();
|
||||||
|
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr,
|
||||||
|
-1, null, uriCacheEnabled);
|
||||||
return new DNAddrPair(chosenNode, targetAddr, storageType, block);
|
return new DNAddrPair(chosenNode, targetAddr, storageType, block);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -417,6 +417,9 @@ public interface HdfsClientConfigKeys {
|
|||||||
|
|
||||||
String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size";
|
String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size";
|
||||||
|
|
||||||
|
String URI_CACHE_KEY = PREFIX + "uri.cache.enabled";
|
||||||
|
boolean URI_CACHE_DEFAULT = false;
|
||||||
|
|
||||||
interface ShortCircuit {
|
interface ShortCircuit {
|
||||||
String PREFIX = Read.PREFIX + "shortcircuit.";
|
String PREFIX = Read.PREFIX + "shortcircuit.";
|
||||||
|
|
||||||
|
@ -129,6 +129,7 @@ public class DfsClientConf {
|
|||||||
private final int blockWriteLocateFollowingMaxDelayMs;
|
private final int blockWriteLocateFollowingMaxDelayMs;
|
||||||
private final long defaultBlockSize;
|
private final long defaultBlockSize;
|
||||||
private final long prefetchSize;
|
private final long prefetchSize;
|
||||||
|
private final boolean uriCacheEnabled;
|
||||||
private final short defaultReplication;
|
private final short defaultReplication;
|
||||||
private final String taskId;
|
private final String taskId;
|
||||||
private final FsPermission uMask;
|
private final FsPermission uMask;
|
||||||
@ -211,24 +212,7 @@ public class DfsClientConf {
|
|||||||
Write.MAX_PACKETS_IN_FLIGHT_KEY,
|
Write.MAX_PACKETS_IN_FLIGHT_KEY,
|
||||||
Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
|
Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
|
||||||
|
|
||||||
final boolean byteArrayManagerEnabled = conf.getBoolean(
|
writeByteArrayManagerConf = loadWriteByteArrayManagerConf(conf);
|
||||||
Write.ByteArrayManager.ENABLED_KEY,
|
|
||||||
Write.ByteArrayManager.ENABLED_DEFAULT);
|
|
||||||
if (!byteArrayManagerEnabled) {
|
|
||||||
writeByteArrayManagerConf = null;
|
|
||||||
} else {
|
|
||||||
final int countThreshold = conf.getInt(
|
|
||||||
Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
|
|
||||||
Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
|
|
||||||
final int countLimit = conf.getInt(
|
|
||||||
Write.ByteArrayManager.COUNT_LIMIT_KEY,
|
|
||||||
Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
|
|
||||||
final long countResetTimePeriodMs = conf.getLong(
|
|
||||||
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
|
|
||||||
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
|
|
||||||
writeByteArrayManagerConf = new ByteArrayManager.Conf(
|
|
||||||
countThreshold, countLimit, countResetTimePeriodMs);
|
|
||||||
}
|
|
||||||
|
|
||||||
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
|
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
|
||||||
DFS_BLOCK_SIZE_DEFAULT);
|
DFS_BLOCK_SIZE_DEFAULT);
|
||||||
@ -240,6 +224,10 @@ public class DfsClientConf {
|
|||||||
Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
||||||
prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
|
prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
|
||||||
10 * defaultBlockSize);
|
10 * defaultBlockSize);
|
||||||
|
|
||||||
|
uriCacheEnabled = conf.getBoolean(Read.URI_CACHE_KEY,
|
||||||
|
Read.URI_CACHE_DEFAULT);
|
||||||
|
|
||||||
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
||||||
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
||||||
numBlockWriteRetry = conf.getInt(
|
numBlockWriteRetry = conf.getInt(
|
||||||
@ -308,6 +296,27 @@ public class DfsClientConf {
|
|||||||
"can't be more then 5.");
|
"can't be more then 5.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ByteArrayManager.Conf loadWriteByteArrayManagerConf(
|
||||||
|
Configuration conf) {
|
||||||
|
final boolean byteArrayManagerEnabled = conf.getBoolean(
|
||||||
|
Write.ByteArrayManager.ENABLED_KEY,
|
||||||
|
Write.ByteArrayManager.ENABLED_DEFAULT);
|
||||||
|
if (!byteArrayManagerEnabled) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final int countThreshold = conf.getInt(
|
||||||
|
Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
|
||||||
|
Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
|
||||||
|
final int countLimit = conf.getInt(
|
||||||
|
Write.ByteArrayManager.COUNT_LIMIT_KEY,
|
||||||
|
Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
|
||||||
|
final long countResetTimePeriodMs = conf.getLong(
|
||||||
|
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
|
||||||
|
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
|
||||||
|
return new ByteArrayManager.Conf(
|
||||||
|
countThreshold, countLimit, countResetTimePeriodMs);
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private List<Class<? extends ReplicaAccessorBuilder>>
|
private List<Class<? extends ReplicaAccessorBuilder>>
|
||||||
loadReplicaAccessorBuilderClasses(Configuration conf) {
|
loadReplicaAccessorBuilderClasses(Configuration conf) {
|
||||||
@ -555,6 +564,13 @@ public class DfsClientConf {
|
|||||||
return prefetchSize;
|
return prefetchSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the uriCacheEnable
|
||||||
|
*/
|
||||||
|
public boolean isUriCacheEnabled() {
|
||||||
|
return uriCacheEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the defaultReplication
|
* @return the defaultReplication
|
||||||
*/
|
*/
|
||||||
|
@ -4185,6 +4185,15 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.client.read.uri.cache.enabled</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
If true, dfs client will use cache when creating URI based on host:port
|
||||||
|
to reduce the frequency of URI object creation.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.client.read.short.circuit.replica.stale.threshold.ms</name>
|
<name>dfs.client.read.short.circuit.replica.stale.threshold.ms</name>
|
||||||
<value>1800000</value>
|
<value>1800000</value>
|
||||||
|
@ -44,6 +44,7 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
|
|||||||
HdfsClientConfigKeys.Failover.class,
|
HdfsClientConfigKeys.Failover.class,
|
||||||
HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
|
HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
|
||||||
HdfsClientConfigKeys.BlockWrite.class,
|
HdfsClientConfigKeys.BlockWrite.class,
|
||||||
|
HdfsClientConfigKeys.Read.class,
|
||||||
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };
|
||||||
|
|
||||||
// Set error modes
|
// Set error modes
|
||||||
|
Loading…
x
Reference in New Issue
Block a user