HADOOP-13351. TestDFSClientSocketSize buffer size tests are flaky. Contributed by Aaron Fabbri and Mingliang Liu.
This commit is contained in:
parent
6cf017558a
commit
5537c6b234
@ -22,7 +22,6 @@
|
|||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -33,7 +32,6 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestDFSClientSocketSize {
|
public class TestDFSClientSocketSize {
|
||||||
@ -43,55 +41,65 @@ public class TestDFSClientSocketSize {
|
|||||||
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
|
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Configuration conf = new Configuration();
|
/**
|
||||||
private MiniDFSCluster cluster;
|
* The setting of socket send buffer size in
|
||||||
private Socket socket;
|
* {@link java.net.Socket#setSendBufferSize(int)} is only a hint. Actual
|
||||||
|
* value may differ. We just sanity check that it is somewhere close.
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDefaultSendBufferSize() throws IOException {
|
public void testDefaultSendBufferSize() throws IOException {
|
||||||
socket = createSocket();
|
assertTrue("Send buffer size should be somewhere near default.",
|
||||||
assertEquals("Send buffer size should be the default value.",
|
getSendBufferSize(new Configuration()) >=
|
||||||
DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT,
|
DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT / 2);
|
||||||
socket.getSendBufferSize());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Note that {@link java.net.Socket#setSendBufferSize(int)} is only a hint.
|
||||||
|
* If this test is flaky it should be ignored. See HADOOP-13351.
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSpecifiedSendBufferSize() throws IOException {
|
public void testSpecifiedSendBufferSize() throws IOException {
|
||||||
final int mySendBufferSize = 64 * 1024; // 64 KB
|
final Configuration conf1 = new Configuration();
|
||||||
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, mySendBufferSize);
|
conf1.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 256 * 1024); // 256 KB
|
||||||
socket = createSocket();
|
final int sendBufferSize1 = getSendBufferSize(conf1);
|
||||||
assertEquals("Send buffer size should be the customized value.",
|
|
||||||
mySendBufferSize, socket.getSendBufferSize());
|
final Configuration conf2 = new Configuration();
|
||||||
|
conf2.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 1024); // 1 KB
|
||||||
|
final int sendBufferSize2 = getSendBufferSize(conf2);
|
||||||
|
|
||||||
|
LOG.info("Large buf size is {}, small is {}",
|
||||||
|
sendBufferSize1, sendBufferSize2);
|
||||||
|
assertTrue("Larger specified send buffer should have effect",
|
||||||
|
sendBufferSize1 > sendBufferSize2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoTuningSendBufferSize() throws IOException {
|
public void testAutoTuningSendBufferSize() throws IOException {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 0);
|
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 0);
|
||||||
socket = createSocket();
|
final int sendBufferSize = getSendBufferSize(conf);
|
||||||
LOG.info("The auto tuned send buffer size is: {}",
|
LOG.info("The auto tuned send buffer size is: {}", sendBufferSize);
|
||||||
socket.getSendBufferSize());
|
|
||||||
assertTrue("Send buffer size should be non-negative value which is " +
|
assertTrue("Send buffer size should be non-negative value which is " +
|
||||||
"determined by system (kernel).", socket.getSendBufferSize() > 0);
|
"determined by system (kernel).", sendBufferSize > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
private int getSendBufferSize(Configuration conf) throws IOException {
|
||||||
public void tearDown() throws Exception {
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
if (socket != null) {
|
.numDataNodes(1)
|
||||||
LOG.info("Closing the DFSClient socket.");
|
.build();
|
||||||
}
|
try {
|
||||||
if (cluster != null) {
|
cluster.waitActive();
|
||||||
LOG.info("Shutting down MiniDFSCluster.");
|
LOG.info("MiniDFSCluster started.");
|
||||||
cluster.shutdown();
|
try (Socket socket = DataStreamer.createSocketForPipeline(
|
||||||
cluster = null;
|
new DatanodeInfo(cluster.dataNodes.get(0).datanode.getDatanodeId()),
|
||||||
|
1, cluster.getFileSystem().getClient())) {
|
||||||
|
return socket.getSendBufferSize();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Socket createSocket() throws IOException {
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
||||||
cluster.waitActive();
|
|
||||||
LOG.info("MiniDFSCluster started.");
|
|
||||||
return DataStreamer.createSocketForPipeline(
|
|
||||||
new DatanodeInfo(cluster.dataNodes.get(0).datanode.getDatanodeId()),
|
|
||||||
1, cluster.getFileSystem().getClient());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user