HADOOP-9916. Fix race in ipc.Client retry. (Binglin Chang via llu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1519973 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b87bcbb82d
commit
0e0271b5fd
@ -406,6 +406,8 @@ Release 2.1.1-beta - UNRELEASED
|
|||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
HADOOP-9916. Fix race in ipc.Client retry. (Binglin Chang via llu)
|
||||||
|
|
||||||
HADOOP-9768. chown and chgrp reject users and groups with spaces on platforms
|
HADOOP-9768. chown and chgrp reject users and groups with spaces on platforms
|
||||||
where spaces are otherwise acceptable. (cnauroth)
|
where spaces are otherwise acceptable. (cnauroth)
|
||||||
|
|
||||||
|
@ -1063,8 +1063,8 @@ private void receiveRpcResponse() {
|
|||||||
if (status == RpcStatusProto.SUCCESS) {
|
if (status == RpcStatusProto.SUCCESS) {
|
||||||
Writable value = ReflectionUtils.newInstance(valueClass, conf);
|
Writable value = ReflectionUtils.newInstance(valueClass, conf);
|
||||||
value.readFields(in); // read value
|
value.readFields(in); // read value
|
||||||
call.setRpcResponse(value);
|
|
||||||
calls.remove(callId);
|
calls.remove(callId);
|
||||||
|
call.setRpcResponse(value);
|
||||||
|
|
||||||
// verify that length was correct
|
// verify that length was correct
|
||||||
// only for ProtobufEngine where len can be verified easily
|
// only for ProtobufEngine where len can be verified easily
|
||||||
@ -1098,8 +1098,8 @@ private void receiveRpcResponse() {
|
|||||||
new RemoteException(exceptionClassName, errorMsg) :
|
new RemoteException(exceptionClassName, errorMsg) :
|
||||||
new RemoteException(exceptionClassName, errorMsg, erCode));
|
new RemoteException(exceptionClassName, errorMsg, erCode));
|
||||||
if (status == RpcStatusProto.ERROR) {
|
if (status == RpcStatusProto.ERROR) {
|
||||||
call.setException(re);
|
|
||||||
calls.remove(callId);
|
calls.remove(callId);
|
||||||
|
call.setException(re);
|
||||||
} else if (status == RpcStatusProto.FATAL) {
|
} else if (status == RpcStatusProto.FATAL) {
|
||||||
// Close the connection
|
// Close the connection
|
||||||
markClosed(re);
|
markClosed(re);
|
||||||
@ -1166,8 +1166,8 @@ private void cleanupCalls() {
|
|||||||
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
|
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
|
||||||
while (itor.hasNext()) {
|
while (itor.hasNext()) {
|
||||||
Call c = itor.next().getValue();
|
Call c = itor.next().getValue();
|
||||||
|
itor.remove();
|
||||||
c.setException(closeException); // local exception
|
c.setException(closeException); // local exception
|
||||||
itor.remove();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -216,13 +216,13 @@ public ConnectionId getConnectionId() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testSerial() throws IOException, InterruptedException {
|
public void testSerial() throws IOException, InterruptedException {
|
||||||
testSerial(3, false, 2, 5, 100);
|
internalTestSerial(3, false, 2, 5, 100);
|
||||||
testSerial(3, true, 2, 5, 10);
|
internalTestSerial(3, true, 2, 5, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSerial(int handlerCount, boolean handlerSleep,
|
public void internalTestSerial(int handlerCount, boolean handlerSleep,
|
||||||
int clientCount, int callerCount, int callCount)
|
int clientCount, int callerCount, int callCount)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
Server server = new TestServer(handlerCount, handlerSleep);
|
Server server = new TestServer(handlerCount, handlerSleep);
|
||||||
@ -249,7 +249,7 @@ public void testSerial(int handlerCount, boolean handlerSleep,
|
|||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testStandAloneClient() throws IOException {
|
public void testStandAloneClient() throws IOException {
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
|
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
|
||||||
@ -383,7 +383,7 @@ private void doErrorTest(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIOEOnClientWriteParam() throws Exception {
|
public void testIOEOnClientWriteParam() throws Exception {
|
||||||
doErrorTest(IOEOnWriteWritable.class,
|
doErrorTest(IOEOnWriteWritable.class,
|
||||||
LongWritable.class,
|
LongWritable.class,
|
||||||
@ -391,7 +391,7 @@ public void testIOEOnClientWriteParam() throws Exception {
|
|||||||
LongWritable.class);
|
LongWritable.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testRTEOnClientWriteParam() throws Exception {
|
public void testRTEOnClientWriteParam() throws Exception {
|
||||||
doErrorTest(RTEOnWriteWritable.class,
|
doErrorTest(RTEOnWriteWritable.class,
|
||||||
LongWritable.class,
|
LongWritable.class,
|
||||||
@ -399,7 +399,7 @@ public void testRTEOnClientWriteParam() throws Exception {
|
|||||||
LongWritable.class);
|
LongWritable.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIOEOnServerReadParam() throws Exception {
|
public void testIOEOnServerReadParam() throws Exception {
|
||||||
doErrorTest(LongWritable.class,
|
doErrorTest(LongWritable.class,
|
||||||
IOEOnReadWritable.class,
|
IOEOnReadWritable.class,
|
||||||
@ -407,7 +407,7 @@ public void testIOEOnServerReadParam() throws Exception {
|
|||||||
LongWritable.class);
|
LongWritable.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testRTEOnServerReadParam() throws Exception {
|
public void testRTEOnServerReadParam() throws Exception {
|
||||||
doErrorTest(LongWritable.class,
|
doErrorTest(LongWritable.class,
|
||||||
RTEOnReadWritable.class,
|
RTEOnReadWritable.class,
|
||||||
@ -416,7 +416,7 @@ public void testRTEOnServerReadParam() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIOEOnServerWriteResponse() throws Exception {
|
public void testIOEOnServerWriteResponse() throws Exception {
|
||||||
doErrorTest(LongWritable.class,
|
doErrorTest(LongWritable.class,
|
||||||
LongWritable.class,
|
LongWritable.class,
|
||||||
@ -424,7 +424,7 @@ public void testIOEOnServerWriteResponse() throws Exception {
|
|||||||
LongWritable.class);
|
LongWritable.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testRTEOnServerWriteResponse() throws Exception {
|
public void testRTEOnServerWriteResponse() throws Exception {
|
||||||
doErrorTest(LongWritable.class,
|
doErrorTest(LongWritable.class,
|
||||||
LongWritable.class,
|
LongWritable.class,
|
||||||
@ -432,7 +432,7 @@ public void testRTEOnServerWriteResponse() throws Exception {
|
|||||||
LongWritable.class);
|
LongWritable.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIOEOnClientReadResponse() throws Exception {
|
public void testIOEOnClientReadResponse() throws Exception {
|
||||||
doErrorTest(LongWritable.class,
|
doErrorTest(LongWritable.class,
|
||||||
LongWritable.class,
|
LongWritable.class,
|
||||||
@ -440,7 +440,7 @@ public void testIOEOnClientReadResponse() throws Exception {
|
|||||||
IOEOnReadWritable.class);
|
IOEOnReadWritable.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testRTEOnClientReadResponse() throws Exception {
|
public void testRTEOnClientReadResponse() throws Exception {
|
||||||
doErrorTest(LongWritable.class,
|
doErrorTest(LongWritable.class,
|
||||||
LongWritable.class,
|
LongWritable.class,
|
||||||
@ -453,7 +453,7 @@ public void testRTEOnClientReadResponse() throws Exception {
|
|||||||
* that a ping should have been sent. This is a reproducer for a
|
* that a ping should have been sent. This is a reproducer for a
|
||||||
* deadlock seen in one iteration of HADOOP-6762.
|
* deadlock seen in one iteration of HADOOP-6762.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIOEOnWriteAfterPingClient() throws Exception {
|
public void testIOEOnWriteAfterPingClient() throws Exception {
|
||||||
// start server
|
// start server
|
||||||
Client.setPingInterval(conf, 100);
|
Client.setPingInterval(conf, 100);
|
||||||
@ -481,7 +481,7 @@ private static void assertExceptionContains(
|
|||||||
* Test that, if the socket factory throws an IOE, it properly propagates
|
* Test that, if the socket factory throws an IOE, it properly propagates
|
||||||
* to the client.
|
* to the client.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testSocketFactoryException() throws IOException {
|
public void testSocketFactoryException() throws IOException {
|
||||||
SocketFactory mockFactory = mock(SocketFactory.class);
|
SocketFactory mockFactory = mock(SocketFactory.class);
|
||||||
doThrow(new IOException("Injected fault")).when(mockFactory).createSocket();
|
doThrow(new IOException("Injected fault")).when(mockFactory).createSocket();
|
||||||
@ -503,7 +503,7 @@ public void testSocketFactoryException() throws IOException {
|
|||||||
* failure is handled properly. This is a regression test for
|
* failure is handled properly. This is a regression test for
|
||||||
* HADOOP-7428.
|
* HADOOP-7428.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testRTEDuringConnectionSetup() throws IOException {
|
public void testRTEDuringConnectionSetup() throws IOException {
|
||||||
// Set up a socket factory which returns sockets which
|
// Set up a socket factory which returns sockets which
|
||||||
// throw an RTE when setSoTimeout is called.
|
// throw an RTE when setSoTimeout is called.
|
||||||
@ -544,7 +544,7 @@ public Socket answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIpcTimeout() throws IOException {
|
public void testIpcTimeout() throws IOException {
|
||||||
// start server
|
// start server
|
||||||
Server server = new TestServer(1, true);
|
Server server = new TestServer(1, true);
|
||||||
@ -566,7 +566,7 @@ public void testIpcTimeout() throws IOException {
|
|||||||
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
|
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIpcConnectTimeout() throws IOException {
|
public void testIpcConnectTimeout() throws IOException {
|
||||||
// start server
|
// start server
|
||||||
Server server = new TestServer(1, true);
|
Server server = new TestServer(1, true);
|
||||||
@ -670,31 +670,31 @@ private long countOpenFileDescriptors() {
|
|||||||
return FD_DIR.list().length;
|
return FD_DIR.list().length;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIpcFromHadoop_0_18_13() throws IOException {
|
public void testIpcFromHadoop_0_18_13() throws IOException {
|
||||||
doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP,
|
doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP,
|
||||||
NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC);
|
NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIpcFromHadoop0_20_3() throws IOException {
|
public void testIpcFromHadoop0_20_3() throws IOException {
|
||||||
doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP,
|
doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP,
|
||||||
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
|
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testIpcFromHadoop0_21_0() throws IOException {
|
public void testIpcFromHadoop0_21_0() throws IOException {
|
||||||
doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP,
|
doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP,
|
||||||
NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC);
|
NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testHttpGetResponse() throws IOException {
|
public void testHttpGetResponse() throws IOException {
|
||||||
doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(),
|
doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(),
|
||||||
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
|
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testConnectionRetriesOnSocketTimeoutExceptions() throws IOException {
|
public void testConnectionRetriesOnSocketTimeoutExceptions() throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
// set max retries to 0
|
// set max retries to 0
|
||||||
@ -720,7 +720,7 @@ private static class CallInfo {
|
|||||||
* (1) the rpc server uses the call id/retry provided by the rpc client, and
|
* (1) the rpc server uses the call id/retry provided by the rpc client, and
|
||||||
* (2) the rpc client receives the same call id/retry from the rpc server.
|
* (2) the rpc client receives the same call id/retry from the rpc server.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testCallIdAndRetry() throws IOException {
|
public void testCallIdAndRetry() throws IOException {
|
||||||
final CallInfo info = new CallInfo();
|
final CallInfo info = new CallInfo();
|
||||||
|
|
||||||
@ -772,7 +772,7 @@ private interface DummyProtocol {
|
|||||||
/**
|
/**
|
||||||
* Test the retry count while used in a retry proxy.
|
* Test the retry count while used in a retry proxy.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testRetryProxy() throws IOException {
|
public void testRetryProxy() throws IOException {
|
||||||
final Client client = new Client(LongWritable.class, conf);
|
final Client client = new Client(LongWritable.class, conf);
|
||||||
|
|
||||||
@ -785,7 +785,9 @@ public void run() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final int totalRetry = 256;
|
// try more times, so it is easier to find race condition bug
|
||||||
|
// 10000 times runs about 6s on a core i7 machine
|
||||||
|
final int totalRetry = 10000;
|
||||||
DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
|
DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
|
||||||
DummyProtocol.class.getClassLoader(),
|
DummyProtocol.class.getClassLoader(),
|
||||||
new Class[] { DummyProtocol.class }, new TestInvocationHandler(client,
|
new Class[] { DummyProtocol.class }, new TestInvocationHandler(client,
|
||||||
@ -807,7 +809,7 @@ public void run() {
|
|||||||
/**
|
/**
|
||||||
* Test if the rpc server gets the default retry count (0) from client.
|
* Test if the rpc server gets the default retry count (0) from client.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testInitialCallRetryCount() throws IOException {
|
public void testInitialCallRetryCount() throws IOException {
|
||||||
// Override client to store the call id
|
// Override client to store the call id
|
||||||
final Client client = new Client(LongWritable.class, conf);
|
final Client client = new Client(LongWritable.class, conf);
|
||||||
@ -838,7 +840,7 @@ public void run() {
|
|||||||
/**
|
/**
|
||||||
* Test if the rpc server gets the retry count from client.
|
* Test if the rpc server gets the retry count from client.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testCallRetryCount() throws IOException {
|
public void testCallRetryCount() throws IOException {
|
||||||
final int retryCount = 255;
|
final int retryCount = 255;
|
||||||
// Override client to store the call id
|
// Override client to store the call id
|
||||||
@ -873,7 +875,7 @@ public void run() {
|
|||||||
* even if multiple threads are using the same client.
|
* even if multiple threads are using the same client.
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=60000)
|
||||||
public void testUniqueSequentialCallIds()
|
public void testUniqueSequentialCallIds()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;
|
int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;
|
||||||
|
Loading…
Reference in New Issue
Block a user