HADOOP-10300. Allowed deferred sending of call responses. (Daryn Sharp via yliu)

This commit is contained in:
yliu 2015-10-12 16:09:14 +08:00
parent 049c6e8dc0
commit e617cf6dd1
3 changed files with 212 additions and 28 deletions

View File

@ -621,6 +621,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12360. Create StatsD metrics2 sink. (Dave Marion via stevel)
HADOOP-10300. Allowed deferred sending of call responses. (Daryn Sharp via
yliu)
IMPROVEMENTS
HADOOP-12458. Retries is typoed to spell Retires in parts of

View File

@ -579,6 +579,7 @@ public static class Call implements Schedulable {
private long timestamp; // time received when response is null
// time served when response is not null
private ByteBuffer rpcResponse; // the response for this call
private AtomicInteger responseWaitCount = new AtomicInteger(1);
private final RPC.RpcKind rpcKind;
private final byte[] clientId;
private final TraceScope traceScope; // the HTrace scope on the server side
@ -613,10 +614,47 @@ public String toString() {
+ retryCount;
}
public void setResponse(Throwable t) throws IOException {
setupResponse(new ByteArrayOutputStream(), this,
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
null, t.getClass().getName(), StringUtils.stringifyException(t));
}
public void setResponse(ByteBuffer response) {
this.rpcResponse = response;
}
/**
* Allow a IPC response to be postponed instead of sent immediately
* after the handler returns from the proxy method. The intended use
* case is freeing up the handler thread when the response is known,
* but an expensive pre-condition must be satisfied before it's sent
* to the client.
*/
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"})
public void postponeResponse() {
int count = responseWaitCount.incrementAndGet();
assert count > 0 : "response has already been sent";
}
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"})
public void sendResponse() throws IOException {
int count = responseWaitCount.decrementAndGet();
assert count >= 0 : "response has already been sent";
if (count == 0) {
if (rpcResponse == null) {
// needed by postponed operations to indicate an exception has
// occurred. it's too late to re-encode the response so just
// drop the connection.
connection.close();
} else {
connection.sendResponse(this);
}
}
}
// For Schedulable
@Override
public UserGroupInformation getUserGroupInformation() {
@ -1227,10 +1265,6 @@ public class Connection {
RpcConstants.INVALID_RETRY_COUNT, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
private final Call saslCall = new Call(AuthProtocol.SASL.callId,
RpcConstants.INVALID_RETRY_COUNT, null, this);
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
private boolean sentNegotiate = false;
private boolean useWrap = false;
@ -1544,17 +1578,20 @@ private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
}
private void doSaslReply(Message message) throws IOException {
final Call saslCall = new Call(AuthProtocol.SASL.callId,
RpcConstants.INVALID_RETRY_COUNT, null, this);
final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
setupResponse(saslResponse, saslCall,
RpcStatusProto.SUCCESS, null,
new RpcResponseWrapper(message), null, null);
responder.doRespond(saslCall);
saslCall.sendResponse();
}
private void doSaslReply(Exception ioe) throws IOException {
setupResponse(authFailedResponse, authFailedCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,
null, ioe.getClass().getName(), ioe.getLocalizedMessage());
responder.doRespond(authFailedCall);
authFailedCall.sendResponse();
}
private void disposeSasl() {
@ -1763,7 +1800,7 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
setupResponse(buffer, fakeCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
fakeCall.sendResponse();
} else if (clientVersion >= 3) {
Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
this);
@ -1771,7 +1808,7 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
fakeCall.sendResponse();
} else if (clientVersion == 2) { // Hadoop 0.18.3
Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
this);
@ -1781,8 +1818,7 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
WritableUtils.writeString(out, VersionMismatch.class.getName());
WritableUtils.writeString(out, errMsg);
fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray()));
responder.doRespond(fakeCall);
fakeCall.sendResponse();
}
}
@ -1790,7 +1826,7 @@ private void setupHttpRequestOnIpcPortResponse() throws IOException {
Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
fakeCall.setResponse(ByteBuffer.wrap(
RECEIVED_HTTP_REQ_RESPONSE.getBytes(Charsets.UTF_8)));
responder.doRespond(fakeCall);
fakeCall.sendResponse();
}
/** Reads the connection context following the connection header
@ -1941,7 +1977,7 @@ private void processOneRpc(byte[] buf)
setupResponse(authFailedResponse, call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage());
responder.doRespond(call);
call.sendResponse();
throw wrse;
}
}
@ -2151,6 +2187,10 @@ private <T extends Message> T decodeProtobufFromStream(Builder builder,
}
}
private void sendResponse(Call call) throws IOException {
responder.doRespond(call);
}
/**
* Get service class for connection
* @return the serviceClass
@ -2288,7 +2328,7 @@ public Writable run() throws Exception {
+ call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
responder.doRespond(call);
call.sendResponse();
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
@ -2484,7 +2524,7 @@ private void closeConnection(Connection connection) {
* @param error error message, if the call failed
* @throws IOException
*/
private void setupResponse(ByteArrayOutputStream responseBuf,
private static void setupResponse(ByteArrayOutputStream responseBuf,
Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
Writable rv, String errorClass, String error)
throws IOException {
@ -2580,7 +2620,7 @@ private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
}
private void wrapWithSasl(ByteArrayOutputStream response, Call call)
private static void wrapWithSasl(ByteArrayOutputStream response, Call call)
throws IOException {
if (call.connection.saslServer != null) {
byte[] token = response.toByteArray();

View File

@ -18,35 +18,43 @@
package org.apache.hadoop.ipc;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import junit.framework.TestCase;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
import org.junit.Test;
/**
* This test provokes partial writes in the server, which is
* serving multiple clients.
*/
public class TestIPCServerResponder extends TestCase {
public class TestIPCServerResponder {
public static final Log LOG =
LogFactory.getLog(TestIPCServerResponder.class);
private static Configuration conf = new Configuration();
public TestIPCServerResponder(final String name) {
super(name);
}
private static final Random RANDOM = new Random();
private static final String ADDRESS = "0.0.0.0";
@ -115,21 +123,23 @@ public void run() {
}
}
@Test
public void testResponseBuffer()
throws IOException, InterruptedException {
Server.INITIAL_RESP_BUF_SIZE = 1;
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
1);
testServerResponder(1, true, 1, 1, 5);
checkServerResponder(1, true, 1, 1, 5);
conf = new Configuration(); // reset configuration
}
@Test
public void testServerResponder()
throws IOException, InterruptedException {
testServerResponder(10, true, 1, 10, 200);
checkServerResponder(10, true, 1, 10, 200);
}
public void testServerResponder(final int handlerCount,
public void checkServerResponder(final int handlerCount,
final boolean handlerSleep,
final int clientCount,
final int callerCount,
@ -159,4 +169,135 @@ public void testServerResponder(final int handlerCount,
server.stop();
}
// Test that IPC calls can be marked for a deferred response.
// call 0: immediate
// call 1: immediate
// call 2: delayed with wait for 1 sendResponse, check if blocked
// call 3: immediate, proves handler is freed
// call 4: delayed with wait for 2 sendResponses, check if blocked
// call 2: sendResponse, should return
// call 4: sendResponse, should remain blocked
// call 5: immediate, prove handler is still free
// call 4: sendResponse, expect it to return
@Test(timeout=10000)
public void testDeferResponse() throws IOException, InterruptedException {
final AtomicReference<Call> deferredCall = new AtomicReference<Call>();
final AtomicInteger count = new AtomicInteger();
final Writable wait0 = new IntWritable(0);
final Writable wait1 = new IntWritable(1);
final Writable wait2 = new IntWritable(2);
// use only 1 handler to prove it's freed after every call
Server server = new Server(ADDRESS, 0, IntWritable.class, 1, conf){
@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable waitCount, long receiveTime) throws IOException {
Call call = Server.getCurCall().get();
int wait = ((IntWritable)waitCount).get();
while (wait-- > 0) {
call.postponeResponse();
deferredCall.set(call);
}
return new IntWritable(count.getAndIncrement());
}
};
server.start();
final InetSocketAddress address = NetUtils.getConnectAddress(server);
final Client client = new Client(IntWritable.class, conf);
Call[] waitingCalls = new Call[2];
// calls should return immediately, check the sequence number is
// increasing
assertEquals(0,
((IntWritable)client.call(wait0, address)).get());
assertEquals(1,
((IntWritable)client.call(wait0, address)).get());
// do a call in the background that will have a deferred response
final ExecutorService exec = Executors.newCachedThreadPool();
Future<Integer> future1 = exec.submit(new Callable<Integer>() {
@Override
public Integer call() throws IOException {
return ((IntWritable)client.call(wait1, address)).get();
}
});
// make sure it blocked
try {
future1.get(1, TimeUnit.SECONDS);
Assert.fail("ipc shouldn't have responded");
} catch (TimeoutException te) {
// ignore, expected
} catch (Exception ex) {
Assert.fail("unexpected exception:"+ex);
}
assertFalse(future1.isDone());
waitingCalls[0] = deferredCall.get();
assertNotNull(waitingCalls[0]);
// proves the handler isn't tied up, and that the prior sequence number
// was consumed
assertEquals(3,
((IntWritable)client.call(wait0, address)).get());
// another call with wait count of 2
Future<Integer> future2 = exec.submit(new Callable<Integer>() {
@Override
public Integer call() throws IOException {
return ((IntWritable)client.call(wait2, address)).get();
}
});
// make sure it blocked
try {
future2.get(1, TimeUnit.SECONDS);
Assert.fail("ipc shouldn't have responded");
} catch (TimeoutException te) {
// ignore, expected
} catch (Exception ex) {
Assert.fail("unexpected exception:"+ex);
}
assertFalse(future2.isDone());
waitingCalls[1] = deferredCall.get();
assertNotNull(waitingCalls[1]);
// the background calls should still be blocked
assertFalse(future1.isDone());
assertFalse(future2.isDone());
// trigger responses
waitingCalls[0].sendResponse();
waitingCalls[1].sendResponse();
try {
int val = future1.get(1, TimeUnit.SECONDS);
assertEquals(2, val);
} catch (Exception ex) {
Assert.fail("unexpected exception:"+ex);
}
// make sure it's still blocked
try {
future2.get(1, TimeUnit.SECONDS);
Assert.fail("ipc shouldn't have responded");
} catch (TimeoutException te) {
// ignore, expected
} catch (Exception ex) {
Assert.fail("unexpected exception:"+ex);
}
assertFalse(future2.isDone());
// call should return immediately
assertEquals(5,
((IntWritable)client.call(wait0, address)).get());
// trigger last waiting call
waitingCalls[1].sendResponse();
try {
int val = future2.get(1, TimeUnit.SECONDS);
assertEquals(4, val);
} catch (Exception ex) {
Assert.fail("unexpected exception:"+ex);
}
server.stop();
}
}