HDFS-13665. [SBN read] Move RPC response serialization into Server.doResponse(). Contributed by Plamen Jeliazkov.

This commit is contained in:
Plamen Jeliazkov 2018-07-11 16:07:05 -07:00 committed by Konstantin V Shvachko
parent 64b7cf59bd
commit e27708c2da

View File

@ -856,15 +856,15 @@ private class RpcCall extends Call {
final Writable rpcRequest; // Serialized Rpc request from client final Writable rpcRequest; // Serialized Rpc request from client
ByteBuffer rpcResponse; // the response for this call ByteBuffer rpcResponse; // the response for this call
private RpcResponseHeaderProto bufferedHeader; // the response header private ResponseParams responseParams; // the response params
private Writable bufferedRv; // the byte response private Writable rv; // the byte response
RpcCall(RpcCall call) { RpcCall(RpcCall call) {
super(call); super(call);
this.connection = call.connection; this.connection = call.connection;
this.rpcRequest = call.rpcRequest; this.rpcRequest = call.rpcRequest;
this.bufferedRv = call.bufferedRv; this.rv = call.rv;
this.bufferedHeader = call.bufferedHeader; this.responseParams = call.responseParams;
} }
RpcCall(Connection connection, int id) { RpcCall(Connection connection, int id) {
@ -885,12 +885,10 @@ private class RpcCall extends Call {
this.rpcRequest = param; this.rpcRequest = param;
} }
public void setBufferedHeader(RpcResponseHeaderProto header) { void setResponseFields(Writable returnValue,
this.bufferedHeader = header; ResponseParams responseParams) {
} this.rv = returnValue;
this.responseParams = responseParams;
public void setBufferedRv(Writable rv) {
this.bufferedRv = rv;
} }
@Override @Override
@ -924,9 +922,7 @@ public Void run() throws Exception {
populateResponseParamsOnError(e, responseParams); populateResponseParamsOnError(e, responseParams);
} }
if (!isResponseDeferred()) { if (!isResponseDeferred()) {
setupResponse(this, responseParams.returnStatus, setResponseFields(value, responseParams);
responseParams.detailedErr,
value, responseParams.errorClass, responseParams.error);
sendResponse(); sendResponse();
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -981,13 +977,11 @@ void doResponse(Throwable t) throws IOException {
setupResponse(call, setupResponse(call,
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER, RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
null, t.getClass().getName(), StringUtils.stringifyException(t)); null, t.getClass().getName(), StringUtils.stringifyException(t));
} else if (alignmentContext != null) { } else {
// rebuild response with state context in header setupResponse(call, call.responseParams.returnStatus,
RpcResponseHeaderProto.Builder responseHeader = call.responseParams.detailedErr, call.rv,
call.bufferedHeader.toBuilder(); call.responseParams.errorClass,
alignmentContext.updateResponseState(responseHeader); call.responseParams.error);
RpcResponseHeaderProto builtHeader = responseHeader.build();
setupResponse(call, builtHeader, call.bufferedRv);
} }
connection.sendResponse(call); connection.sendResponse(call);
} }
@ -3012,6 +3006,9 @@ private void setupResponse(
headerBuilder.setRetryCount(call.retryCount); headerBuilder.setRetryCount(call.retryCount);
headerBuilder.setStatus(status); headerBuilder.setStatus(status);
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
if (alignmentContext != null) {
alignmentContext.updateResponseState(headerBuilder);
}
if (status == RpcStatusProto.SUCCESS) { if (status == RpcStatusProto.SUCCESS) {
RpcResponseHeaderProto header = headerBuilder.build(); RpcResponseHeaderProto header = headerBuilder.build();
@ -3038,12 +3035,6 @@ private void setupResponse(
private void setupResponse(RpcCall call, private void setupResponse(RpcCall call,
RpcResponseHeaderProto header, Writable rv) throws IOException { RpcResponseHeaderProto header, Writable rv) throws IOException {
if (alignmentContext != null && call.bufferedHeader == null
&& call.bufferedRv == null) {
call.setBufferedHeader(header);
call.setBufferedRv(rv);
}
final byte[] response; final byte[] response;
if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) { if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
response = setupResponseForProtobuf(header, rv); response = setupResponseForProtobuf(header, rv);