Revert "HADOOP-13465. Design Server.Call to be extensible for unified call queue. Contributed by Daryn Sharp."

This reverts commit d288a0ba83.
This commit is contained in:
Kihwal Lee 2016-08-25 16:04:54 -05:00
parent 1360bd2d54
commit 81485dbfc1

View File

@ -354,7 +354,8 @@ public static int getCallRetryCount() {
*/ */
public static InetAddress getRemoteIp() { public static InetAddress getRemoteIp() {
Call call = CurCall.get(); Call call = CurCall.get();
return (call != null ) ? call.getHostInetAddress() : null; return (call != null && call.connection != null) ? call.connection
.getHostInetAddress() : null;
} }
/** /**
@ -379,7 +380,8 @@ public static String getRemoteAddress() {
*/ */
public static UserGroupInformation getRemoteUser() { public static UserGroupInformation getRemoteUser() {
Call call = CurCall.get(); Call call = CurCall.get();
return (call != null) ? call.getRemoteUser() : null; return (call != null && call.connection != null) ? call.connection.user
: null;
} }
/** Return true if the invocation was through an RPC. /** Return true if the invocation was through an RPC.
@ -481,7 +483,7 @@ void logSlowRpcCalls(String methodName, int processingTime) {
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) && if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) { (processingTime > threeSigma)) {
if(LOG.isWarnEnabled()) { if(LOG.isWarnEnabled()) {
String client = CurCall.get().toString(); String client = CurCall.get().connection.toString();
LOG.warn( LOG.warn(
"Slow RPC : " + methodName + " took " + processingTime + "Slow RPC : " + methodName + " took " + processingTime +
" milliseconds to process from client " + client); " milliseconds to process from client " + client);
@ -655,41 +657,48 @@ static boolean getClientBackoffEnable(
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT); CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
} }
/** A generic call queued for handling. */ /** A call queued for handling. */
public static class Call implements Schedulable, public static class Call implements Schedulable {
PrivilegedExceptionAction<Void> { private final int callId; // the client's call id
final int callId; // the client's call id private final int retryCount; // the retry count of the call
final int retryCount; // the retry count of the call private final Writable rpcRequest; // Serialized Rpc request from client
long timestamp; // time received when response is null private final Connection connection; // connection to client
private long timestamp; // time received when response is null
// time served when response is not null // time served when response is not null
private ByteBuffer rpcResponse; // the response for this call
private AtomicInteger responseWaitCount = new AtomicInteger(1); private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind; private final RPC.RpcKind rpcKind;
final byte[] clientId; private final byte[] clientId;
private final TraceScope traceScope; // the HTrace scope on the server side private final TraceScope traceScope; // the HTrace scope on the server side
private final CallerContext callerContext; // the call context private final CallerContext callerContext; // the call context
private int priorityLevel; private int priorityLevel;
// the priority level assigned by scheduler, 0 by default // the priority level assigned by scheduler, 0 by default
Call(Call call) { private Call(Call call) {
this(call.callId, call.retryCount, call.rpcKind, call.clientId, this(call.callId, call.retryCount, call.rpcRequest, call.connection,
call.traceScope, call.callerContext); call.rpcKind, call.clientId, call.traceScope, call.callerContext);
} }
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId) { public Call(int id, int retryCount, Writable param,
this(id, retryCount, kind, clientId, null, null); Connection connection) {
this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
RpcConstants.DUMMY_CLIENT_ID);
} }
@VisibleForTesting // primarily TestNamenodeRetryCache public Call(int id, int retryCount, Writable param, Connection connection,
public Call(int id, int retryCount, Void ignore1, Void ignore2,
RPC.RpcKind kind, byte[] clientId) { RPC.RpcKind kind, byte[] clientId) {
this(id, retryCount, kind, clientId, null, null); this(id, retryCount, param, connection, kind, clientId, null, null);
} }
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId, public Call(int id, int retryCount, Writable param, Connection connection,
TraceScope traceScope, CallerContext callerContext) { RPC.RpcKind kind, byte[] clientId, TraceScope traceScope,
CallerContext callerContext) {
this.callId = id; this.callId = id;
this.retryCount = retryCount; this.retryCount = retryCount;
this.rpcRequest = param;
this.connection = connection;
this.timestamp = Time.now(); this.timestamp = Time.now();
this.rpcResponse = null;
this.rpcKind = kind; this.rpcKind = kind;
this.clientId = clientId; this.clientId = clientId;
this.traceScope = traceScope; this.traceScope = traceScope;
@ -698,22 +707,12 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2,
@Override @Override
public String toString() { public String toString() {
return "Call#" + callId + " Retry#" + retryCount; return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"
+ retryCount;
} }
public Void run() throws Exception { public void setResponse(ByteBuffer response) {
return null; this.rpcResponse = response;
}
// should eventually be abstract but need to avoid breaking tests
public UserGroupInformation getRemoteUser() {
return null;
}
public InetAddress getHostInetAddress() {
return null;
}
public String getHostAddress() {
InetAddress addr = getHostInetAddress();
return (addr != null) ? addr.getHostAddress() : null;
} }
/** /**
@ -725,36 +724,34 @@ public String getHostAddress() {
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.LimitedPrivate({"HDFS"})
public final void postponeResponse() { public void postponeResponse() {
int count = responseWaitCount.incrementAndGet(); int count = responseWaitCount.incrementAndGet();
assert count > 0 : "response has already been sent"; assert count > 0 : "response has already been sent";
} }
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.LimitedPrivate({"HDFS"})
public final void sendResponse() throws IOException { public void sendResponse() throws IOException {
int count = responseWaitCount.decrementAndGet(); int count = responseWaitCount.decrementAndGet();
assert count >= 0 : "response has already been sent"; assert count >= 0 : "response has already been sent";
if (count == 0) { if (count == 0) {
doResponse(null); connection.sendResponse(this);
} }
} }
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.LimitedPrivate({"HDFS"})
public final void abortResponse(Throwable t) throws IOException { public void abortResponse(Throwable t) throws IOException {
// don't send response if the call was already sent or aborted. // don't send response if the call was already sent or aborted.
if (responseWaitCount.getAndSet(-1) > 0) { if (responseWaitCount.getAndSet(-1) > 0) {
doResponse(t); connection.abortResponse(this, t);
} }
} }
void doResponse(Throwable t) throws IOException {}
// For Schedulable // For Schedulable
@Override @Override
public UserGroupInformation getUserGroupInformation() { public UserGroupInformation getUserGroupInformation() {
return getRemoteUser(); return connection.user;
} }
@Override @Override
@ -767,114 +764,6 @@ public void setPriorityLevel(int priorityLevel) {
} }
} }
/** A RPC extended call queued for handling. */
private class RpcCall extends Call {
final Connection connection; // connection to client
final Writable rpcRequest; // Serialized Rpc request from client
ByteBuffer rpcResponse; // the response for this call
RpcCall(RpcCall call) {
super(call);
this.connection = call.connection;
this.rpcRequest = call.rpcRequest;
}
RpcCall(Connection connection, int id) {
this(connection, id, RpcConstants.INVALID_RETRY_COUNT);
}
RpcCall(Connection connection, int id, int retryCount) {
this(connection, id, retryCount, null,
RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID,
null, null);
}
RpcCall(Connection connection, int id, int retryCount,
Writable param, RPC.RpcKind kind, byte[] clientId,
TraceScope traceScope, CallerContext context) {
super(id, retryCount, kind, clientId, traceScope, context);
this.connection = connection;
this.rpcRequest = param;
}
@Override
public UserGroupInformation getRemoteUser() {
return connection.user;
}
@Override
public InetAddress getHostInetAddress() {
return connection.getHostInetAddress();
}
@Override
public Void run() throws Exception {
if (!connection.channel.isOpen()) {
Server.LOG.info(Thread.currentThread().getName() + ": skipped " + this);
return null;
}
String errorClass = null;
String error = null;
RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
RpcErrorCodeProto detailedErr = null;
Writable value = null;
try {
value = call(
rpcKind, connection.protocolName, rpcRequest, timestamp);
} catch (Throwable e) {
if (e instanceof UndeclaredThrowableException) {
e = e.getCause();
}
logException(Server.LOG, e, this);
if (e instanceof RpcServerException) {
RpcServerException rse = ((RpcServerException)e);
returnStatus = rse.getRpcStatusProto();
detailedErr = rse.getRpcErrorCodeProto();
} else {
returnStatus = RpcStatusProto.ERROR;
detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
}
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
// Remove redundant error class name from the beginning of the
// stack trace
String exceptionHdr = errorClass + ": ";
if (error.startsWith(exceptionHdr)) {
error = error.substring(exceptionHdr.length());
}
}
setupResponse(this, returnStatus, detailedErr,
value, errorClass, error);
sendResponse();
return null;
}
void setResponse(ByteBuffer response) throws IOException {
this.rpcResponse = response;
}
@Override
void doResponse(Throwable t) throws IOException {
RpcCall call = this;
if (t != null) {
// clone the call to prevent a race with another thread stomping
// on the response while being sent. the original call is
// effectively discarded since the wait count won't hit zero
call = new RpcCall(this);
setupResponse(call,
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
null, t.getClass().getName(), StringUtils.stringifyException(t));
}
connection.sendResponse(this);
}
@Override
public String toString() {
return super.toString() + " " + rpcRequest + " from " + connection;
}
}
/** Listens on the socket. Creates jobs for the handler threads*/ /** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread { private class Listener extends Thread {
@ -1205,22 +1094,22 @@ private void doRunLoop() {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Checking for old call responses."); LOG.debug("Checking for old call responses.");
} }
ArrayList<RpcCall> calls; ArrayList<Call> calls;
// get the list of channels from list of keys. // get the list of channels from list of keys.
synchronized (writeSelector.keys()) { synchronized (writeSelector.keys()) {
calls = new ArrayList<RpcCall>(writeSelector.keys().size()); calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator(); iter = writeSelector.keys().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
SelectionKey key = iter.next(); SelectionKey key = iter.next();
RpcCall call = (RpcCall)key.attachment(); Call call = (Call)key.attachment();
if (call != null && key.channel() == call.connection.channel) { if (call != null && key.channel() == call.connection.channel) {
calls.add(call); calls.add(call);
} }
} }
} }
for (RpcCall call : calls) { for(Call call : calls) {
doPurge(call, now); doPurge(call, now);
} }
} catch (OutOfMemoryError e) { } catch (OutOfMemoryError e) {
@ -1238,7 +1127,7 @@ private void doRunLoop() {
} }
private void doAsyncWrite(SelectionKey key) throws IOException { private void doAsyncWrite(SelectionKey key) throws IOException {
RpcCall call = (RpcCall)key.attachment(); Call call = (Call)key.attachment();
if (call == null) { if (call == null) {
return; return;
} }
@ -1266,10 +1155,10 @@ private void doAsyncWrite(SelectionKey key) throws IOException {
// Remove calls that have been pending in the responseQueue // Remove calls that have been pending in the responseQueue
// for a long time. // for a long time.
// //
private void doPurge(RpcCall call, long now) { private void doPurge(Call call, long now) {
LinkedList<RpcCall> responseQueue = call.connection.responseQueue; LinkedList<Call> responseQueue = call.connection.responseQueue;
synchronized (responseQueue) { synchronized (responseQueue) {
Iterator<RpcCall> iter = responseQueue.listIterator(0); Iterator<Call> iter = responseQueue.listIterator(0);
while (iter.hasNext()) { while (iter.hasNext()) {
call = iter.next(); call = iter.next();
if (now > call.timestamp + PURGE_INTERVAL) { if (now > call.timestamp + PURGE_INTERVAL) {
@ -1283,12 +1172,12 @@ private void doPurge(RpcCall call, long now) {
// Processes one response. Returns true if there are no more pending // Processes one response. Returns true if there are no more pending
// data for this channel. // data for this channel.
// //
private boolean processResponse(LinkedList<RpcCall> responseQueue, private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException { boolean inHandler) throws IOException {
boolean error = true; boolean error = true;
boolean done = false; // there is more data for this channel. boolean done = false; // there is more data for this channel.
int numElements = 0; int numElements = 0;
RpcCall call = null; Call call = null;
try { try {
synchronized (responseQueue) { synchronized (responseQueue) {
// //
@ -1371,7 +1260,7 @@ private boolean processResponse(LinkedList<RpcCall> responseQueue,
// //
// Enqueue a response from the application. // Enqueue a response from the application.
// //
void doRespond(RpcCall call) throws IOException { void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) { synchronized (call.connection.responseQueue) {
// must only wrap before adding to the responseQueue to prevent // must only wrap before adding to the responseQueue to prevent
// postponed responses from being encrypted and sent out of order. // postponed responses from being encrypted and sent out of order.
@ -1469,7 +1358,7 @@ public class Connection {
private SocketChannel channel; private SocketChannel channel;
private ByteBuffer data; private ByteBuffer data;
private ByteBuffer dataLengthBuffer; private ByteBuffer dataLengthBuffer;
private LinkedList<RpcCall> responseQueue; private LinkedList<Call> responseQueue;
// number of outstanding rpcs // number of outstanding rpcs
private AtomicInteger rpcCount = new AtomicInteger(); private AtomicInteger rpcCount = new AtomicInteger();
private long lastContact; private long lastContact;
@ -1496,8 +1385,8 @@ public class Connection {
public UserGroupInformation attemptingUser = null; // user name before auth public UserGroupInformation attemptingUser = null; // user name before auth
// Fake 'call' for failed authorization response // Fake 'call' for failed authorization response
private final RpcCall authFailedCall = private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
new RpcCall(this, AUTHORIZATION_FAILED_CALL_ID); RpcConstants.INVALID_RETRY_COUNT, null, this);
private boolean sentNegotiate = false; private boolean sentNegotiate = false;
private boolean useWrap = false; private boolean useWrap = false;
@ -1520,7 +1409,7 @@ public Connection(SocketChannel channel, long lastContact) {
this.hostAddress = addr.getHostAddress(); this.hostAddress = addr.getHostAddress();
} }
this.remotePort = socket.getPort(); this.remotePort = socket.getPort();
this.responseQueue = new LinkedList<RpcCall>(); this.responseQueue = new LinkedList<Call>();
if (socketSendBufferSize != 0) { if (socketSendBufferSize != 0) {
try { try {
socket.setSendBufferSize(socketSendBufferSize); socket.setSendBufferSize(socketSendBufferSize);
@ -1815,7 +1704,8 @@ private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
} }
private void doSaslReply(Message message) throws IOException { private void doSaslReply(Message message) throws IOException {
final RpcCall saslCall = new RpcCall(this, AuthProtocol.SASL.callId); final Call saslCall = new Call(AuthProtocol.SASL.callId,
RpcConstants.INVALID_RETRY_COUNT, null, this);
setupResponse(saslCall, setupResponse(saslCall,
RpcStatusProto.SUCCESS, null, RpcStatusProto.SUCCESS, null,
RpcWritable.wrap(message), null, null); RpcWritable.wrap(message), null, null);
@ -2032,20 +1922,23 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
if (clientVersion >= 9) { if (clientVersion >= 9) {
// Versions >>9 understand the normal response // Versions >>9 understand the normal response
RpcCall fakeCall = new RpcCall(this, -1); Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
this);
setupResponse(fakeCall, setupResponse(fakeCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH, RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
null, VersionMismatch.class.getName(), errMsg); null, VersionMismatch.class.getName(), errMsg);
fakeCall.sendResponse(); fakeCall.sendResponse();
} else if (clientVersion >= 3) { } else if (clientVersion >= 3) {
RpcCall fakeCall = new RpcCall(this, -1); Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
this);
// Versions 3 to 8 use older response // Versions 3 to 8 use older response
setupResponseOldVersionFatal(buffer, fakeCall, setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg); null, VersionMismatch.class.getName(), errMsg);
fakeCall.sendResponse(); fakeCall.sendResponse();
} else if (clientVersion == 2) { // Hadoop 0.18.3 } else if (clientVersion == 2) { // Hadoop 0.18.3
RpcCall fakeCall = new RpcCall(this, 0); Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
this);
DataOutputStream out = new DataOutputStream(buffer); DataOutputStream out = new DataOutputStream(buffer);
out.writeInt(0); // call ID out.writeInt(0); // call ID
out.writeBoolean(true); // error out.writeBoolean(true); // error
@ -2057,7 +1950,7 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
} }
private void setupHttpRequestOnIpcPortResponse() throws IOException { private void setupHttpRequestOnIpcPortResponse() throws IOException {
RpcCall fakeCall = new RpcCall(this, 0); Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
fakeCall.setResponse(ByteBuffer.wrap( fakeCall.setResponse(ByteBuffer.wrap(
RECEIVED_HTTP_REQ_RESPONSE.getBytes(StandardCharsets.UTF_8))); RECEIVED_HTTP_REQ_RESPONSE.getBytes(StandardCharsets.UTF_8)));
fakeCall.sendResponse(); fakeCall.sendResponse();
@ -2205,7 +2098,7 @@ private void processOneRpc(ByteBuffer bb)
} }
} catch (WrappedRpcServerException wrse) { // inform client of error } catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause(); Throwable ioe = wrse.getCause();
final RpcCall call = new RpcCall(this, callId, retry); final Call call = new Call(callId, retry, null, this);
setupResponse(call, setupResponse(call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null, RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage()); ioe.getClass().getName(), ioe.getMessage());
@ -2305,9 +2198,8 @@ private void processRpcRequest(RpcRequestHeaderProto header,
.build(); .build();
} }
RpcCall call = new RpcCall(this, header.getCallId(), Call call = new Call(header.getCallId(), header.getRetryCount(),
header.getRetryCount(), rpcRequest, rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceScope, callerContext); header.getClientId().toByteArray(), traceScope, callerContext);
// Save the priority level assignment by the scheduler // Save the priority level assignment by the scheduler
@ -2431,10 +2323,21 @@ <T extends Message> T getMessage(Message message,
} }
} }
private void sendResponse(RpcCall call) throws IOException { private void sendResponse(Call call) throws IOException {
responder.doRespond(call); responder.doRespond(call);
} }
private void abortResponse(Call call, Throwable t) throws IOException {
// clone the call to prevent a race with the other thread stomping
// on the response while being sent. the original call is
// effectively discarded since the wait count won't hit zero
call = new Call(call);
setupResponse(call,
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
null, t.getClass().getName(), StringUtils.stringifyException(t));
call.sendResponse();
}
/** /**
* Get service class for connection * Get service class for connection
* @return the serviceClass * @return the serviceClass
@ -2485,6 +2388,16 @@ public void run() {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind); LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
} }
if (!call.connection.channel.isOpen()) {
LOG.info(Thread.currentThread().getName() + ": skipped " + call);
continue;
}
String errorClass = null;
String error = null;
RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
RpcErrorCodeProto detailedErr = null;
Writable value = null;
CurCall.set(call); CurCall.set(call);
if (call.traceScope != null) { if (call.traceScope != null) {
call.traceScope.reattach(); call.traceScope.reattach();
@ -2493,11 +2406,53 @@ public void run() {
} }
// always update the current call context // always update the current call context
CallerContext.setCurrent(call.callerContext); CallerContext.setCurrent(call.callerContext);
UserGroupInformation remoteUser = call.getRemoteUser();
if (remoteUser != null) { try {
remoteUser.doAs(call); // Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else { } else {
call.run(); value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
}
);
}
} catch (Throwable e) {
if (e instanceof UndeclaredThrowableException) {
e = e.getCause();
}
logException(LOG, e, call);
if (e instanceof RpcServerException) {
RpcServerException rse = ((RpcServerException)e);
returnStatus = rse.getRpcStatusProto();
detailedErr = rse.getRpcErrorCodeProto();
} else {
returnStatus = RpcStatusProto.ERROR;
detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
}
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
// Remove redundant error class name from the beginning of the stack trace
String exceptionHdr = errorClass + ": ";
if (error.startsWith(exceptionHdr)) {
error = error.substring(exceptionHdr.length());
}
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
setupResponse(call, returnStatus, detailedErr,
value, errorClass, error);
call.sendResponse();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (running) { // unexpected -- log it if (running) { // unexpected -- log it
@ -2514,7 +2469,6 @@ public void run() {
StringUtils.stringifyException(e)); StringUtils.stringifyException(e));
} }
} finally { } finally {
CurCall.set(null);
IOUtils.cleanup(LOG, traceScope); IOUtils.cleanup(LOG, traceScope);
} }
} }
@ -2716,7 +2670,7 @@ private void closeConnection(Connection connection) {
* @throws IOException * @throws IOException
*/ */
private void setupResponse( private void setupResponse(
RpcCall call, RpcStatusProto status, RpcErrorCodeProto erCode, Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
Writable rv, String errorClass, String error) Writable rv, String errorClass, String error)
throws IOException { throws IOException {
RpcResponseHeaderProto.Builder headerBuilder = RpcResponseHeaderProto.Builder headerBuilder =
@ -2750,7 +2704,7 @@ private void setupResponse(
} }
} }
private void setupResponse(RpcCall call, private void setupResponse(Call call,
RpcResponseHeaderProto header, Writable rv) throws IOException { RpcResponseHeaderProto header, Writable rv) throws IOException {
ResponseBuffer buf = responseBuffer.get().reset(); ResponseBuffer buf = responseBuffer.get().reset();
try { try {
@ -2784,7 +2738,7 @@ private void setupResponse(RpcCall call,
* @throws IOException * @throws IOException
*/ */
private void setupResponseOldVersionFatal(ByteArrayOutputStream response, private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
RpcCall call, Call call,
Writable rv, String errorClass, String error) Writable rv, String errorClass, String error)
throws IOException { throws IOException {
final int OLD_VERSION_FATAL_STATUS = -1; final int OLD_VERSION_FATAL_STATUS = -1;
@ -2797,7 +2751,7 @@ private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
call.setResponse(ByteBuffer.wrap(response.toByteArray())); call.setResponse(ByteBuffer.wrap(response.toByteArray()));
} }
private void wrapWithSasl(RpcCall call) throws IOException { private void wrapWithSasl(Call call) throws IOException {
if (call.connection.saslServer != null) { if (call.connection.saslServer != null) {
byte[] token = call.rpcResponse.array(); byte[] token = call.rpcResponse.array();
// synchronization may be needed since there can be multiple Handler // synchronization may be needed since there can be multiple Handler