diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 06ee2bcaf5..61a8b2616f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -179,6 +179,9 @@ Release 2.0.0 - UNRELEASED HDFS-3167. CLI-based driver for MiniDFSCluster. (Henry Robinson via atm) + HDFS-3148. The client should be able to use multiple local interfaces + for data transfer. (eli) + IMPROVEMENTS HDFS-2018. Move all journal stream management code into one place. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 37575e55fa..70285262bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -57,12 +57,16 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; import java.net.URI; +import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import javax.net.SocketFactory; @@ -123,6 +127,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -132,7 +137,9 @@ import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.net.InetAddresses; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -168,6 +175,8 @@ public class DFSClient implements java.io.Closeable { final LeaseRenewer leaserenewer; final SocketCache socketCache; final Conf dfsClientConf; + private Random r = new Random(); + private SocketAddress[] localInterfaceAddrs; /** * DFSClient configuration @@ -361,6 +370,68 @@ public class DFSClient implements java.io.Closeable { if (LOG.isDebugEnabled()) { LOG.debug("Short circuit read is " + shortCircuitLocalReads); } + String localInterfaces[] = + conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES); + localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces); + if (LOG.isDebugEnabled() && 0 != localInterfaces.length) { + LOG.debug("Using local interfaces [" + + Joiner.on(',').join(localInterfaces)+ "] with addresses [" + + Joiner.on(',').join(localInterfaceAddrs) + "]"); + } + } + + /** + * Return the socket addresses to use with each configured + * local interface. Local interfaces may be specified by IP + * address, IP address range using CIDR notation, interface + * name (e.g. eth0) or sub-interface name (e.g. eth0:0). + * The socket addresses consist of the IPs for the interfaces + * and the ephemeral port (port 0). If an IP, IP range, or + * interface name matches an interface with sub-interfaces + * only the IP of the interface is used. Sub-interfaces can + * be used by specifying them explicitly (by IP or name). + * + * @return SocketAddresses for the configured local interfaces, + * or an empty array if none are configured + * @throws UnknownHostException if a given interface name is invalid + */ + private static SocketAddress[] getLocalInterfaceAddrs( + String interfaceNames[]) throws UnknownHostException { + List localAddrs = new ArrayList(); + for (String interfaceName : interfaceNames) { + if (InetAddresses.isInetAddress(interfaceName)) { + localAddrs.add(new InetSocketAddress(interfaceName, 0)); + } else if (NetUtils.isValidSubnet(interfaceName)) { + for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) { + localAddrs.add(new InetSocketAddress(addr, 0)); + } + } else { + for (String ip : DNS.getIPs(interfaceName, false)) { + localAddrs.add(new InetSocketAddress(ip, 0)); + } + } + } + return localAddrs.toArray(new SocketAddress[localAddrs.size()]); + } + + /** + * Select one of the configured local interfaces at random. We use a random + * interface because other policies like round-robin are less effective + * given that we cache connections to datanodes. + * + * @return one of the local interface addresses at random, or null if no + * local interfaces are configured + */ + SocketAddress getRandomLocalInterfaceAddr() { + if (localInterfaceAddrs.length == 0) { + return null; + } + final int idx = r.nextInt(localInterfaceAddrs.length); + final SocketAddress addr = localInterfaceAddrs[idx]; + if (LOG.isDebugEnabled()) { + LOG.debug("Using local interface " + addr); + } + return addr; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 4a945b5d5a..3d8c628750 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -197,6 +197,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir"; public static final String DFS_HOSTS = "dfs.hosts"; public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude"; + public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces"; // Much code in hdfs is not yet updated to use these keys. public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 58f807f320..0d00acd57e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -850,7 +850,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable // disaster. sock.setTcpNoDelay(true); - NetUtils.connect(sock, dnAddr, dfsClient.getConf().socketTimeout); + NetUtils.connect(sock, dnAddr, + dfsClient.getRandomLocalInterfaceAddr(), + dfsClient.getConf().socketTimeout); sock.setSoTimeout(dfsClient.getConf().socketTimeout); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 1d4a45c848..9322143a9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1171,7 +1171,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { NetUtils.createSocketAddr(first.getXferAddr()); final Socket sock = client.socketFactory.createSocket(); final int timeout = client.getDatanodeReadTimeout(length); - NetUtils.connect(sock, isa, timeout); + NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout); sock.setSoTimeout(timeout); sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); if(DFSClient.LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 2ab7c1dec8..b084d4b4c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -844,4 +844,18 @@ + + dfs.client.local.interfaces + + A comma separated list of network interface names to use + for data transfer between the client and datanodes. When creating + a connection to read from or write to a datanode, the client + chooses one of the specified interfaces at random and binds its + socket to the IP of that interface. Individual names may be + specified as either an interface name (eg "eth0"), a subinterface + name (eg "eth0:0"), or an IP address (which may be specified using + CIDR notation to match a range of IPs). + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 87848f33a1..357b79e7bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -38,6 +38,7 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.EnumSet; import org.apache.commons.logging.LogFactory; @@ -67,6 +68,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.io.IOUtils; import org.apache.log4j.Level; +import static org.junit.Assume.assumeTrue; /** * This class tests various cases during file creation. @@ -140,11 +142,34 @@ public class TestFileCreation extends junit.framework.TestCase { } } + public void testFileCreation() throws IOException { + checkFileCreation(null); + } + + /** Same test but the client should bind to a local interface */ + public void testFileCreationSetLocalInterface() throws IOException { + assumeTrue(System.getProperty("os.name").startsWith("Linux")); + + // The mini cluster listens on the loopback so we can use it here + checkFileCreation("lo"); + + try { + checkFileCreation("bogus-interface"); + fail("Able to specify a bogus interface"); + } catch (UnknownHostException e) { + assertEquals("No such interface bogus-interface", e.getMessage()); + } + } + /** * Test if file creation and disk space consumption works right + * @param netIf the local interface, if any, clients should use to access DNs */ - public void testFileCreation() throws IOException { + public void checkFileCreation(String netIf) throws IOException { Configuration conf = new HdfsConfiguration(); + if (netIf != null) { + conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf); + } if (simulatedStorage) { SimulatedFSDataset.setFactory(conf); }