HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing (iwasakims)
This commit is contained in:
parent
5ff5f67332
commit
9eec6cbedc
@ -676,6 +676,9 @@ Release 2.9.0 - UNRELEASED
|
|||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing
|
||||||
|
(iwasakims)
|
||||||
|
|
||||||
HADOOP-12655. TestHttpServer.testBindAddress bind port range is wider
|
HADOOP-12655. TestHttpServer.testBindAddress bind port range is wider
|
||||||
than expected. (Wei-Chiu Chuang via stevel)
|
than expected. (Wei-Chiu Chuang via stevel)
|
||||||
|
|
||||||
|
@ -27,6 +27,8 @@
|
|||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
@ -71,6 +73,7 @@
|
|||||||
import org.apache.hadoop.io.retry.RetryProxy;
|
import org.apache.hadoop.io.retry.RetryProxy;
|
||||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||||
|
import org.apache.hadoop.ipc.Server.Call;
|
||||||
import org.apache.hadoop.ipc.Server.Connection;
|
import org.apache.hadoop.ipc.Server.Connection;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
@ -84,6 +87,7 @@
|
|||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
@ -703,6 +707,7 @@ public void testIpcWithReaderQueuing() throws Exception {
|
|||||||
// goal is to jam a handler with a connection, fill the callq with
|
// goal is to jam a handler with a connection, fill the callq with
|
||||||
// connections, in turn jamming the readers - then flood the server and
|
// connections, in turn jamming the readers - then flood the server and
|
||||||
// ensure that the listener blocks when the reader connection queues fill
|
// ensure that the listener blocks when the reader connection queues fill
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private void checkBlocking(int readers, int readerQ, int callQ) throws Exception {
|
private void checkBlocking(int readers, int readerQ, int callQ) throws Exception {
|
||||||
int handlers = 1; // makes it easier
|
int handlers = 1; // makes it easier
|
||||||
|
|
||||||
@ -722,6 +727,9 @@ private void checkBlocking(int readers, int readerQ, int callQ) throws Exception
|
|||||||
// start server
|
// start server
|
||||||
final TestServerQueue server =
|
final TestServerQueue server =
|
||||||
new TestServerQueue(clients, readers, callQ, handlers, conf);
|
new TestServerQueue(clients, readers, callQ, handlers, conf);
|
||||||
|
CallQueueManager<Call> spy = spy(
|
||||||
|
(CallQueueManager<Call>)Whitebox.getInternalState(server, "callQueue"));
|
||||||
|
Whitebox.setInternalState(server, "callQueue", spy);
|
||||||
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
@ -757,12 +765,10 @@ public void run() {
|
|||||||
if (i==0) {
|
if (i==0) {
|
||||||
// let first reader block in a call
|
// let first reader block in a call
|
||||||
server.firstCallLatch.await();
|
server.firstCallLatch.await();
|
||||||
} else if (i <= callQ) {
|
}
|
||||||
// let subsequent readers jam the callq, will happen immediately
|
// wait until reader put a call to callQueue, to make sure all readers
|
||||||
while (server.getCallQueueLen() != i) {
|
// are blocking on the queue after initialClients threads are started.
|
||||||
Thread.sleep(1);
|
verify(spy, timeout(100).times(i + 1)).put(Mockito.<Call>anyObject());
|
||||||
}
|
|
||||||
} // additional threads block the readers trying to add to the callq
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
Loading…
Reference in New Issue
Block a user