From 0e0271b5fdf55c55b825e85c56639a4ae7277a39 Mon Sep 17 00:00:00 2001 From: Luke Lu Date: Wed, 4 Sep 2013 10:34:28 +0000 Subject: [PATCH] HADOOP-9916. Fix race in ipc.Client retry. (Binglin Chang via llu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1519973 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../java/org/apache/hadoop/ipc/Client.java | 6 +- .../java/org/apache/hadoop/ipc/TestIPC.java | 60 ++++++++++--------- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 282bc4303f..4475f044f9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -406,6 +406,8 @@ Release 2.1.1-beta - UNRELEASED BUG FIXES + HADOOP-9916. Fix race in ipc.Client retry. (Binglin Chang via llu) + HADOOP-9768. chown and chgrp reject users and groups with spaces on platforms where spaces are otherwise acceptable. (cnauroth) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index ae30dd3eb3..8caa7b288b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1063,8 +1063,8 @@ private void receiveRpcResponse() { if (status == RpcStatusProto.SUCCESS) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value - call.setRpcResponse(value); calls.remove(callId); + call.setRpcResponse(value); // verify that length was correct // only for ProtobufEngine where len can be verified easily @@ -1098,8 +1098,8 @@ private void receiveRpcResponse() { new RemoteException(exceptionClassName, errorMsg) : new RemoteException(exceptionClassName, errorMsg, erCode)); if (status == RpcStatusProto.ERROR) { - call.setException(re); calls.remove(callId); + call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection markClosed(re); @@ -1166,8 +1166,8 @@ private void cleanupCalls() { Iterator> itor = calls.entrySet().iterator() ; while (itor.hasNext()) { Call c = itor.next().getValue(); + itor.remove(); c.setException(closeException); // local exception - itor.remove(); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 75f051aaed..33fb799c0c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -216,13 +216,13 @@ public ConnectionId getConnectionId() { } } - @Test + @Test(timeout=60000) public void testSerial() throws IOException, InterruptedException { - testSerial(3, false, 2, 5, 100); - testSerial(3, true, 2, 5, 10); + internalTestSerial(3, false, 2, 5, 100); + internalTestSerial(3, true, 2, 5, 10); } - public void testSerial(int handlerCount, boolean handlerSleep, + public void internalTestSerial(int handlerCount, boolean handlerSleep, int clientCount, int callerCount, int callCount) throws IOException, InterruptedException { Server server = new TestServer(handlerCount, handlerSleep); @@ -249,7 +249,7 @@ public void testSerial(int handlerCount, boolean handlerSleep, server.stop(); } - @Test + @Test(timeout=60000) public void testStandAloneClient() throws IOException { Client client = new Client(LongWritable.class, conf); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10); @@ -383,7 +383,7 @@ private void doErrorTest( } } - @Test + @Test(timeout=60000) public void testIOEOnClientWriteParam() throws Exception { doErrorTest(IOEOnWriteWritable.class, LongWritable.class, @@ -391,7 +391,7 @@ public void testIOEOnClientWriteParam() throws Exception { LongWritable.class); } - @Test + @Test(timeout=60000) public void testRTEOnClientWriteParam() throws Exception { doErrorTest(RTEOnWriteWritable.class, LongWritable.class, @@ -399,7 +399,7 @@ public void testRTEOnClientWriteParam() throws Exception { LongWritable.class); } - @Test + @Test(timeout=60000) public void testIOEOnServerReadParam() throws Exception { doErrorTest(LongWritable.class, IOEOnReadWritable.class, @@ -407,7 +407,7 @@ public void testIOEOnServerReadParam() throws Exception { LongWritable.class); } - @Test + @Test(timeout=60000) public void testRTEOnServerReadParam() throws Exception { doErrorTest(LongWritable.class, RTEOnReadWritable.class, @@ -416,7 +416,7 @@ public void testRTEOnServerReadParam() throws Exception { } - @Test + @Test(timeout=60000) public void testIOEOnServerWriteResponse() throws Exception { doErrorTest(LongWritable.class, LongWritable.class, @@ -424,7 +424,7 @@ public void testIOEOnServerWriteResponse() throws Exception { LongWritable.class); } - @Test + @Test(timeout=60000) public void testRTEOnServerWriteResponse() throws Exception { doErrorTest(LongWritable.class, LongWritable.class, @@ -432,7 +432,7 @@ public void testRTEOnServerWriteResponse() throws Exception { LongWritable.class); } - @Test + @Test(timeout=60000) public void testIOEOnClientReadResponse() throws Exception { doErrorTest(LongWritable.class, LongWritable.class, @@ -440,7 +440,7 @@ public void testIOEOnClientReadResponse() throws Exception { IOEOnReadWritable.class); } - @Test + @Test(timeout=60000) public void testRTEOnClientReadResponse() throws Exception { doErrorTest(LongWritable.class, LongWritable.class, @@ -453,7 +453,7 @@ public void testRTEOnClientReadResponse() throws Exception { * that a ping should have been sent. This is a reproducer for a * deadlock seen in one iteration of HADOOP-6762. */ - @Test + @Test(timeout=60000) public void testIOEOnWriteAfterPingClient() throws Exception { // start server Client.setPingInterval(conf, 100); @@ -481,7 +481,7 @@ private static void assertExceptionContains( * Test that, if the socket factory throws an IOE, it properly propagates * to the client. */ - @Test + @Test(timeout=60000) public void testSocketFactoryException() throws IOException { SocketFactory mockFactory = mock(SocketFactory.class); doThrow(new IOException("Injected fault")).when(mockFactory).createSocket(); @@ -503,7 +503,7 @@ public void testSocketFactoryException() throws IOException { * failure is handled properly. This is a regression test for * HADOOP-7428. */ - @Test + @Test(timeout=60000) public void testRTEDuringConnectionSetup() throws IOException { // Set up a socket factory which returns sockets which // throw an RTE when setSoTimeout is called. @@ -544,7 +544,7 @@ public Socket answer(InvocationOnMock invocation) throws Throwable { } } - @Test + @Test(timeout=60000) public void testIpcTimeout() throws IOException { // start server Server server = new TestServer(1, true); @@ -566,7 +566,7 @@ public void testIpcTimeout() throws IOException { addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf); } - @Test + @Test(timeout=60000) public void testIpcConnectTimeout() throws IOException { // start server Server server = new TestServer(1, true); @@ -670,31 +670,31 @@ private long countOpenFileDescriptors() { return FD_DIR.list().length; } - @Test + @Test(timeout=60000) public void testIpcFromHadoop_0_18_13() throws IOException { doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP, NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC); } - @Test + @Test(timeout=60000) public void testIpcFromHadoop0_20_3() throws IOException { doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP, NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC); } - @Test + @Test(timeout=60000) public void testIpcFromHadoop0_21_0() throws IOException { doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP, NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC); } - @Test + @Test(timeout=60000) public void testHttpGetResponse() throws IOException { doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(), Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes()); } - @Test + @Test(timeout=60000) public void testConnectionRetriesOnSocketTimeoutExceptions() throws IOException { Configuration conf = new Configuration(); // set max retries to 0 @@ -720,7 +720,7 @@ private static class CallInfo { * (1) the rpc server uses the call id/retry provided by the rpc client, and * (2) the rpc client receives the same call id/retry from the rpc server. */ - @Test + @Test(timeout=60000) public void testCallIdAndRetry() throws IOException { final CallInfo info = new CallInfo(); @@ -772,7 +772,7 @@ private interface DummyProtocol { /** * Test the retry count while used in a retry proxy. */ - @Test + @Test(timeout=60000) public void testRetryProxy() throws IOException { final Client client = new Client(LongWritable.class, conf); @@ -785,7 +785,9 @@ public void run() { } }; - final int totalRetry = 256; + // try more times, so it is easier to find race condition bug + // 10000 times runs about 6s on a core i7 machine + final int totalRetry = 10000; DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance( DummyProtocol.class.getClassLoader(), new Class[] { DummyProtocol.class }, new TestInvocationHandler(client, @@ -807,7 +809,7 @@ public void run() { /** * Test if the rpc server gets the default retry count (0) from client. */ - @Test + @Test(timeout=60000) public void testInitialCallRetryCount() throws IOException { // Override client to store the call id final Client client = new Client(LongWritable.class, conf); @@ -838,7 +840,7 @@ public void run() { /** * Test if the rpc server gets the retry count from client. */ - @Test + @Test(timeout=60000) public void testCallRetryCount() throws IOException { final int retryCount = 255; // Override client to store the call id @@ -873,7 +875,7 @@ public void run() { * even if multiple threads are using the same client. * @throws InterruptedException */ - @Test + @Test(timeout=60000) public void testUniqueSequentialCallIds() throws IOException, InterruptedException { int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;