HDFS-9259. Make SO_SNDBUF size configurable at DFSClient side for hdfs write scenario. (Mingliang Liu via mingma)

This commit is contained in:
Ming Ma 2015-10-27 09:28:40 -07:00
parent c28e16b40c
commit aa09880ab8
6 changed files with 131 additions and 1 deletions

View File

@ -131,7 +131,9 @@ static Socket createSocketForPipeline(final DatanodeInfo first,
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(),
conf.getSocketTimeout());
sock.setSoTimeout(timeout);
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
if (conf.getSocketSendBufferSize() > 0) {
sock.setSendBufferSize(conf.getSocketSendBufferSize());
}
LOG.debug("Send buf size {}", sock.getSendBufferSize());
return sock;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import java.util.concurrent.TimeUnit;
@ -58,6 +59,10 @@ public interface HdfsClientConfigKeys {
String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
String DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY =
"dfs.client.socket.send.buffer.size";
int DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT =
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY =
"dfs.client.socketcache.capacity";
int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;

View File

@ -56,6 +56,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
@ -105,6 +107,7 @@ public class DfsClientConf {
private final int writeMaxPackets;
private final ByteArrayManager.Conf writeByteArrayManagerConf;
private final int socketTimeout;
private final int socketSendBufferSize;
private final long excludedNodesCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught. */
private final int timeWindow;
@ -172,6 +175,8 @@ public DfsClientConf(Configuration conf) {
defaultChecksumOpt = getChecksumOptFromConf(conf);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT);
socketSendBufferSize = conf.getInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY,
DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
/** dfs.write.packet.size is an internal config variable */
writePacketSize = conf.getInt(
DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
@ -407,6 +412,13 @@ public int getSocketTimeout() {
return socketTimeout;
}
/**
* @return the socketSendBufferSize
*/
public int getSocketSendBufferSize() {
return socketSendBufferSize;
}
/**
* @return the excludedNodesCacheExpiry
*/

View File

@ -1584,6 +1584,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9292. Make TestFileConcorruption independent to underlying FsDataset
Implementation. (lei)
HDFS-9259. Make SO_SNDBUF size configurable at DFSClient side for hdfs
write scenario. (Mingliang Liu via mingma)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -2141,6 +2141,18 @@
</description>
</property>
<property>
<name>dfs.client.socket.send.buffer.size</name>
<value>131072</value>
<description>
Socket send buffer size for a write pipeline in DFSClient side.
This may affect TCP connection throughput.
If it is set to zero or negative value,
no buffer size will be set explicitly,
thus enable tcp auto-tuning on some system.
</description>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value></value>

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.Socket;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestDFSClientSocketSize {
private static final Logger LOG = LoggerFactory.getLogger(
TestDFSClientSocketSize.class);
static {
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
}
private final Configuration conf = new Configuration();
private MiniDFSCluster cluster;
private Socket socket;
@Test
public void testDefaultSendBufferSize() throws IOException {
socket = createSocket();
assertEquals("Send buffer size should be the default value.",
DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT,
socket.getSendBufferSize());
}
@Test
public void testSpecifiedSendBufferSize() throws IOException {
final int mySendBufferSize = 64 * 1024; // 64 KB
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, mySendBufferSize);
socket = createSocket();
assertEquals("Send buffer size should be the customized value.",
mySendBufferSize, socket.getSendBufferSize());
}
@Test
public void testAutoTuningSendBufferSize() throws IOException {
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 0);
socket = createSocket();
LOG.info("The auto tuned send buffer size is: {}",
socket.getSendBufferSize());
assertTrue("Send buffer size should be non-negative value which is " +
"determined by system (kernel).", socket.getSendBufferSize() > 0);
}
@After
public void tearDown() throws Exception {
if (socket != null) {
LOG.info("Closing the DFSClient socket.");
}
if (cluster != null) {
LOG.info("Shutting down MiniDFSCluster.");
cluster.shutdown();
}
}
private Socket createSocket() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
LOG.info("MiniDFSCluster started.");
return DataStreamer.createSocketForPipeline(
new DatanodeInfo(cluster.dataNodes.get(0).datanode.getDatanodeId()),
1, cluster.getFileSystem().getClient());
}
}