HDFS-7608: hdfs dfsclient newConnectedPeer has no write timeout (Xiaoyu Yao via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2015-07-14 10:57:59 -07:00
parent 4084eaf943
commit 1d74ccecec
2 changed files with 38 additions and 6 deletions

View File

@ -3127,6 +3127,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
blockToken, datanodeId); blockToken, datanodeId);
peer.setReadTimeout(socketTimeout); peer.setReadTimeout(socketTimeout);
peer.setWriteTimeout(socketTimeout);
success = true; success = true;
return peer; return peer;
} finally { } finally {

View File

@ -1132,10 +1132,9 @@ public void testListFiles() throws IOException {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout=10000) @Test(timeout=10000)
public void testDFSClientPeerTimeout() throws IOException { public void testDFSClientPeerReadTimeout() throws IOException {
final int timeout = 1000; final int timeout = 1000;
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout); conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
@ -1152,11 +1151,11 @@ public void testDFSClientPeerTimeout() throws IOException {
long start = Time.now(); long start = Time.now();
try { try {
peer.getInputStream().read(); peer.getInputStream().read();
Assert.fail("should timeout"); Assert.fail("read should timeout");
} catch (SocketTimeoutException ste) { } catch (SocketTimeoutException ste) {
long delta = Time.now() - start; long delta = Time.now() - start;
Assert.assertTrue("timedout too soon", delta >= timeout*0.9); Assert.assertTrue("read timedout too soon", delta >= timeout*0.9);
Assert.assertTrue("timedout too late", delta <= timeout*1.1); Assert.assertTrue("read timedout too late", delta <= timeout*1.1);
} catch (Throwable t) { } catch (Throwable t) {
Assert.fail("wrong exception:"+t); Assert.fail("wrong exception:"+t);
} }
@ -1178,4 +1177,36 @@ public void testGetServerDefaults() throws IOException {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout=10000)
public void testDFSClientPeerWriteTimeout() throws IOException {
final int timeout = 1000;
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
// only need cluster to create a dfs client to get a peer
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Write 1 MB to a dummy socket to ensure the write times out
ServerSocket socket = new ServerSocket(0);
Peer peer = dfs.getClient().newConnectedPeer(
(InetSocketAddress) socket.getLocalSocketAddress(), null, null);
long start = Time.now();
try {
byte[] buf = new byte[1024 * 1024];
peer.getOutputStream().write(buf);
Assert.fail("write should timeout");
} catch (SocketTimeoutException ste) {
long delta = Time.now() - start;
Assert.assertTrue("write timedout too soon", delta >= timeout * 0.9);
Assert.assertTrue("write timedout too late", delta <= timeout * 1.1);
} catch (Throwable t) {
Assert.fail("wrong exception:" + t);
}
} finally {
cluster.shutdown();
}
}
} }