HDFS-3148. The client should be able to use multiple local interfaces for data transfer. Contributed by Eli Collins
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1308617 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f44ca78179
commit
4f15b9dfed
@ -179,6 +179,9 @@ Release 2.0.0 - UNRELEASED
|
|||||||
|
|
||||||
HDFS-3167. CLI-based driver for MiniDFSCluster. (Henry Robinson via atm)
|
HDFS-3167. CLI-based driver for MiniDFSCluster. (Henry Robinson via atm)
|
||||||
|
|
||||||
|
HDFS-3148. The client should be able to use multiple local interfaces
|
||||||
|
for data transfer. (eli)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-2018. Move all journal stream management code into one place.
|
HDFS-2018. Move all journal stream management code into one place.
|
||||||
|
@ -57,12 +57,16 @@ import java.io.OutputStream;
|
|||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
import java.net.SocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
@ -123,6 +127,7 @@ import org.apache.hadoop.io.Text;
|
|||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
@ -132,7 +137,9 @@ import org.apache.hadoop.security.token.TokenRenewer;
|
|||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.net.InetAddresses;
|
||||||
|
|
||||||
/********************************************************
|
/********************************************************
|
||||||
* DFSClient can connect to a Hadoop Filesystem and
|
* DFSClient can connect to a Hadoop Filesystem and
|
||||||
@ -168,6 +175,8 @@ public class DFSClient implements java.io.Closeable {
|
|||||||
final LeaseRenewer leaserenewer;
|
final LeaseRenewer leaserenewer;
|
||||||
final SocketCache socketCache;
|
final SocketCache socketCache;
|
||||||
final Conf dfsClientConf;
|
final Conf dfsClientConf;
|
||||||
|
private Random r = new Random();
|
||||||
|
private SocketAddress[] localInterfaceAddrs;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DFSClient configuration
|
* DFSClient configuration
|
||||||
@ -361,6 +370,68 @@ public class DFSClient implements java.io.Closeable {
|
|||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Short circuit read is " + shortCircuitLocalReads);
|
LOG.debug("Short circuit read is " + shortCircuitLocalReads);
|
||||||
}
|
}
|
||||||
|
String localInterfaces[] =
|
||||||
|
conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
|
||||||
|
localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
|
||||||
|
if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
|
||||||
|
LOG.debug("Using local interfaces [" +
|
||||||
|
Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
|
||||||
|
Joiner.on(',').join(localInterfaceAddrs) + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the socket addresses to use with each configured
|
||||||
|
* local interface. Local interfaces may be specified by IP
|
||||||
|
* address, IP address range using CIDR notation, interface
|
||||||
|
* name (e.g. eth0) or sub-interface name (e.g. eth0:0).
|
||||||
|
* The socket addresses consist of the IPs for the interfaces
|
||||||
|
* and the ephemeral port (port 0). If an IP, IP range, or
|
||||||
|
* interface name matches an interface with sub-interfaces
|
||||||
|
* only the IP of the interface is used. Sub-interfaces can
|
||||||
|
* be used by specifying them explicitly (by IP or name).
|
||||||
|
*
|
||||||
|
* @return SocketAddresses for the configured local interfaces,
|
||||||
|
* or an empty array if none are configured
|
||||||
|
* @throws UnknownHostException if a given interface name is invalid
|
||||||
|
*/
|
||||||
|
private static SocketAddress[] getLocalInterfaceAddrs(
|
||||||
|
String interfaceNames[]) throws UnknownHostException {
|
||||||
|
List<SocketAddress> localAddrs = new ArrayList<SocketAddress>();
|
||||||
|
for (String interfaceName : interfaceNames) {
|
||||||
|
if (InetAddresses.isInetAddress(interfaceName)) {
|
||||||
|
localAddrs.add(new InetSocketAddress(interfaceName, 0));
|
||||||
|
} else if (NetUtils.isValidSubnet(interfaceName)) {
|
||||||
|
for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
|
||||||
|
localAddrs.add(new InetSocketAddress(addr, 0));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (String ip : DNS.getIPs(interfaceName, false)) {
|
||||||
|
localAddrs.add(new InetSocketAddress(ip, 0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Select one of the configured local interfaces at random. We use a random
|
||||||
|
* interface because other policies like round-robin are less effective
|
||||||
|
* given that we cache connections to datanodes.
|
||||||
|
*
|
||||||
|
* @return one of the local interface addresses at random, or null if no
|
||||||
|
* local interfaces are configured
|
||||||
|
*/
|
||||||
|
SocketAddress getRandomLocalInterfaceAddr() {
|
||||||
|
if (localInterfaceAddrs.length == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final int idx = r.nextInt(localInterfaceAddrs.length);
|
||||||
|
final SocketAddress addr = localInterfaceAddrs[idx];
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Using local interface " + addr);
|
||||||
|
}
|
||||||
|
return addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -197,6 +197,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
|
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
|
||||||
public static final String DFS_HOSTS = "dfs.hosts";
|
public static final String DFS_HOSTS = "dfs.hosts";
|
||||||
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
|
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
|
||||||
|
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
|
||||||
|
|
||||||
// Much code in hdfs is not yet updated to use these keys.
|
// Much code in hdfs is not yet updated to use these keys.
|
||||||
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
|
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
|
||||||
|
@ -850,7 +850,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
|
|||||||
// disaster.
|
// disaster.
|
||||||
sock.setTcpNoDelay(true);
|
sock.setTcpNoDelay(true);
|
||||||
|
|
||||||
NetUtils.connect(sock, dnAddr, dfsClient.getConf().socketTimeout);
|
NetUtils.connect(sock, dnAddr,
|
||||||
|
dfsClient.getRandomLocalInterfaceAddr(),
|
||||||
|
dfsClient.getConf().socketTimeout);
|
||||||
sock.setSoTimeout(dfsClient.getConf().socketTimeout);
|
sock.setSoTimeout(dfsClient.getConf().socketTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1171,7 +1171,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||||||
NetUtils.createSocketAddr(first.getXferAddr());
|
NetUtils.createSocketAddr(first.getXferAddr());
|
||||||
final Socket sock = client.socketFactory.createSocket();
|
final Socket sock = client.socketFactory.createSocket();
|
||||||
final int timeout = client.getDatanodeReadTimeout(length);
|
final int timeout = client.getDatanodeReadTimeout(length);
|
||||||
NetUtils.connect(sock, isa, timeout);
|
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout);
|
||||||
sock.setSoTimeout(timeout);
|
sock.setSoTimeout(timeout);
|
||||||
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
|
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
|
||||||
if(DFSClient.LOG.isDebugEnabled()) {
|
if(DFSClient.LOG.isDebugEnabled()) {
|
||||||
|
@ -844,4 +844,18 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.client.local.interfaces</name>
|
||||||
|
<value></value>
|
||||||
|
<description>A comma separated list of network interface names to use
|
||||||
|
for data transfer between the client and datanodes. When creating
|
||||||
|
a connection to read from or write to a datanode, the client
|
||||||
|
chooses one of the specified interfaces at random and binds its
|
||||||
|
socket to the IP of that interface. Individual names may be
|
||||||
|
specified as either an interface name (eg "eth0"), a subinterface
|
||||||
|
name (eg "eth0:0"), or an IP address (which may be specified using
|
||||||
|
CIDR notation to match a range of IPs).
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
@ -38,6 +38,7 @@ import java.io.FileNotFoundException;
|
|||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -67,6 +68,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
|||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class tests various cases during file creation.
|
* This class tests various cases during file creation.
|
||||||
@ -140,11 +142,34 @@ public class TestFileCreation extends junit.framework.TestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testFileCreation() throws IOException {
|
||||||
|
checkFileCreation(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Same test but the client should bind to a local interface */
|
||||||
|
public void testFileCreationSetLocalInterface() throws IOException {
|
||||||
|
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
|
||||||
|
|
||||||
|
// The mini cluster listens on the loopback so we can use it here
|
||||||
|
checkFileCreation("lo");
|
||||||
|
|
||||||
|
try {
|
||||||
|
checkFileCreation("bogus-interface");
|
||||||
|
fail("Able to specify a bogus interface");
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
assertEquals("No such interface bogus-interface", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test if file creation and disk space consumption works right
|
* Test if file creation and disk space consumption works right
|
||||||
|
* @param netIf the local interface, if any, clients should use to access DNs
|
||||||
*/
|
*/
|
||||||
public void testFileCreation() throws IOException {
|
public void checkFileCreation(String netIf) throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
if (netIf != null) {
|
||||||
|
conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf);
|
||||||
|
}
|
||||||
if (simulatedStorage) {
|
if (simulatedStorage) {
|
||||||
SimulatedFSDataset.setFactory(conf);
|
SimulatedFSDataset.setFactory(conf);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user