HADOOP-8366 Use ProtoBuf for RpcResponseHeader (sanjay radia)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1337283 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cc9c6bdce2
commit
2116f28d9e
@ -67,6 +67,8 @@ Trunk (unreleased changes)
|
||||
|
||||
HADOOP-8308. Support cross-project Jenkins builds. (tomwhite)
|
||||
|
||||
HADOOP-8366 Use ProtoBuf for RpcResponseHeader (sanjay radia)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
|
||||
|
@ -53,6 +53,8 @@
|
||||
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
@ -845,24 +847,24 @@ private void receiveResponse() {
|
||||
touch();
|
||||
|
||||
try {
|
||||
int id = in.readInt(); // try to read an id
|
||||
|
||||
RpcResponseHeaderProto response =
|
||||
RpcResponseHeaderProto.parseDelimitedFrom(in);
|
||||
int callId = response.getCallId();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(getName() + " got value #" + id);
|
||||
LOG.debug(getName() + " got value #" + callId);
|
||||
|
||||
Call call = calls.get(id);
|
||||
|
||||
int state = in.readInt(); // read call status
|
||||
if (state == Status.SUCCESS.state) {
|
||||
Call call = calls.get(callId);
|
||||
RpcStatusProto status = response.getStatus();
|
||||
if (status == RpcStatusProto.SUCCESS) {
|
||||
Writable value = ReflectionUtils.newInstance(valueClass, conf);
|
||||
value.readFields(in); // read value
|
||||
call.setRpcResponse(value);
|
||||
calls.remove(id);
|
||||
} else if (state == Status.ERROR.state) {
|
||||
calls.remove(callId);
|
||||
} else if (status == RpcStatusProto.ERROR) {
|
||||
call.setException(new RemoteException(WritableUtils.readString(in),
|
||||
WritableUtils.readString(in)));
|
||||
calls.remove(id);
|
||||
} else if (state == Status.FATAL.state) {
|
||||
calls.remove(callId);
|
||||
} else if (status == RpcStatusProto.FATAL) {
|
||||
// Close the connection
|
||||
markClosed(new RemoteException(WritableUtils.readString(in),
|
||||
WritableUtils.readString(in)));
|
||||
|
@ -1339,7 +1339,7 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
|
||||
+ ") is configured as simple. Please configure another method "
|
||||
+ "like kerberos or digest.");
|
||||
setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
|
||||
setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
|
||||
null, ae.getClass().getName(), ae.getMessage());
|
||||
responder.doRespond(authFailedCall);
|
||||
throw ae;
|
||||
@ -1420,7 +1420,7 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
|
||||
Call fakeCall = new Call(-1, null, this);
|
||||
// Versions 3 and greater can interpret this exception
|
||||
// response in the same manner
|
||||
setupResponse(buffer, fakeCall, Status.FATAL,
|
||||
setupResponseOldVersionFatal(buffer, fakeCall,
|
||||
null, VersionMismatch.class.getName(), errMsg);
|
||||
|
||||
responder.doRespond(fakeCall);
|
||||
@ -1443,7 +1443,7 @@ private void respondUnsupportedSerialization(IpcSerializationType st) throws IOE
|
||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
||||
|
||||
Call fakeCall = new Call(-1, null, this);
|
||||
setupResponse(buffer, fakeCall, Status.FATAL, null,
|
||||
setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null,
|
||||
IpcException.class.getName(), errMsg);
|
||||
responder.doRespond(fakeCall);
|
||||
}
|
||||
@ -1579,7 +1579,7 @@ private void processData(byte[] buf) throws IOException, InterruptedException {
|
||||
new Call(header.getCallId(), null, this);
|
||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||
|
||||
setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
|
||||
setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
|
||||
IOException.class.getName(),
|
||||
"Unknown rpc kind " + header.getRpcKind());
|
||||
responder.doRespond(readParamsFailedCall);
|
||||
@ -1597,7 +1597,7 @@ private void processData(byte[] buf) throws IOException, InterruptedException {
|
||||
new Call(header.getCallId(), null, this);
|
||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||
|
||||
setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
|
||||
setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
|
||||
t.getClass().getName(),
|
||||
"IPC server unable to read call parameters: " + t.getMessage());
|
||||
responder.doRespond(readParamsFailedCall);
|
||||
@ -1627,7 +1627,7 @@ private boolean authorizeConnection() throws IOException {
|
||||
rpcMetrics.incrAuthorizationSuccesses();
|
||||
} catch (AuthorizationException ae) {
|
||||
rpcMetrics.incrAuthorizationFailures();
|
||||
setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
|
||||
setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, null,
|
||||
ae.getClass().getName(), ae.getMessage());
|
||||
responder.doRespond(authFailedCall);
|
||||
return false;
|
||||
@ -1725,8 +1725,8 @@ public Writable run() throws Exception {
|
||||
// responder.doResponse() since setupResponse may use
|
||||
// SASL to encrypt response data and SASL enforces
|
||||
// its own message ordering.
|
||||
setupResponse(buf, call, (error == null) ? Status.SUCCESS
|
||||
: Status.ERROR, value, errorClass, error);
|
||||
setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
|
||||
: RpcStatusProto.ERROR, value, errorClass, error);
|
||||
|
||||
// Discard the large buf and reset it back to smaller size
|
||||
// to free up heap
|
||||
@ -1859,40 +1859,79 @@ private void closeConnection(Connection connection) {
|
||||
/**
|
||||
* Setup response for the IPC Call.
|
||||
*
|
||||
* @param response buffer to serialize the response into
|
||||
* @param responseBuf buffer to serialize the response into
|
||||
* @param call {@link Call} to which we are setting up the response
|
||||
* @param status {@link Status} of the IPC call
|
||||
* @param status of the IPC call
|
||||
* @param rv return value for the IPC Call, if the call was successful
|
||||
* @param errorClass error class, if the the call failed
|
||||
* @param error error message, if the call failed
|
||||
* @throws IOException
|
||||
*/
|
||||
private void setupResponse(ByteArrayOutputStream response,
|
||||
Call call, Status status,
|
||||
private void setupResponse(ByteArrayOutputStream responseBuf,
|
||||
Call call, RpcStatusProto status,
|
||||
Writable rv, String errorClass, String error)
|
||||
throws IOException {
|
||||
response.reset();
|
||||
DataOutputStream out = new DataOutputStream(response);
|
||||
out.writeInt(call.callId); // write call id
|
||||
out.writeInt(status.state); // write status
|
||||
responseBuf.reset();
|
||||
DataOutputStream out = new DataOutputStream(responseBuf);
|
||||
RpcResponseHeaderProto.Builder response =
|
||||
RpcResponseHeaderProto.newBuilder();
|
||||
response.setCallId(call.callId);
|
||||
response.setStatus(status);
|
||||
|
||||
if (status == Status.SUCCESS) {
|
||||
|
||||
if (status == RpcStatusProto.SUCCESS) {
|
||||
try {
|
||||
response.build().writeDelimitedTo(out);
|
||||
rv.write(out);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Error serializing call response for call " + call, t);
|
||||
// Call back to same function - this is OK since the
|
||||
// buffer is reset at the top, and since status is changed
|
||||
// to ERROR it won't infinite loop.
|
||||
setupResponse(response, call, Status.ERROR,
|
||||
setupResponse(responseBuf, call, RpcStatusProto.ERROR,
|
||||
null, t.getClass().getName(),
|
||||
StringUtils.stringifyException(t));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (status == RpcStatusProto.FATAL) {
|
||||
response.setServerIpcVersionNum(Server.CURRENT_VERSION);
|
||||
}
|
||||
response.build().writeDelimitedTo(out);
|
||||
WritableUtils.writeString(out, errorClass);
|
||||
WritableUtils.writeString(out, error);
|
||||
}
|
||||
if (call.connection.useWrap) {
|
||||
wrapWithSasl(responseBuf, call);
|
||||
}
|
||||
call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup response for the IPC Call on Fatal Error from a
|
||||
* client that is using old version of Hadoop.
|
||||
* The response is serialized using the previous protocol's response
|
||||
* layout.
|
||||
*
|
||||
* @param response buffer to serialize the response into
|
||||
* @param call {@link Call} to which we are setting up the response
|
||||
* @param rv return value for the IPC Call, if the call was successful
|
||||
* @param errorClass error class, if the the call failed
|
||||
* @param error error message, if the call failed
|
||||
* @throws IOException
|
||||
*/
|
||||
private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
|
||||
Call call,
|
||||
Writable rv, String errorClass, String error)
|
||||
throws IOException {
|
||||
final int OLD_VERSION_FATAL_STATUS = -1;
|
||||
response.reset();
|
||||
DataOutputStream out = new DataOutputStream(response);
|
||||
out.writeInt(call.callId); // write call id
|
||||
out.writeInt(OLD_VERSION_FATAL_STATUS); // write FATAL_STATUS
|
||||
WritableUtils.writeString(out, errorClass);
|
||||
WritableUtils.writeString(out, error);
|
||||
|
||||
if (call.connection.useWrap) {
|
||||
wrapWithSasl(response, call);
|
||||
}
|
||||
|
@ -1,32 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
/**
|
||||
* Status of a Hadoop IPC call.
|
||||
*/
|
||||
enum Status {
|
||||
SUCCESS (0),
|
||||
ERROR (1),
|
||||
FATAL (-1);
|
||||
|
||||
int state;
|
||||
private Status(int state) {
|
||||
this.state = state;
|
||||
}
|
||||
}
|
@ -19,7 +19,6 @@ option java_package = "org.apache.hadoop.ipc.protobuf";
|
||||
option java_outer_classname = "RpcPayloadHeaderProtos";
|
||||
option java_generate_equals_and_hash = true;
|
||||
|
||||
|
||||
/**
|
||||
* This is the rpc payload header. It is sent with every rpc call.
|
||||
*
|
||||
@ -34,8 +33,6 @@ option java_generate_equals_and_hash = true;
|
||||
*
|
||||
*/
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* RpcKind determine the rpcEngine and the serialization of the rpc payload
|
||||
*/
|
||||
@ -54,5 +51,27 @@ enum RpcPayloadOperationProto {
|
||||
message RpcPayloadHeaderProto { // the header for the RpcRequest
|
||||
optional RpcKindProto rpcKind = 1;
|
||||
optional RpcPayloadOperationProto rpcOp = 2;
|
||||
optional uint32 callId = 3; // each rpc has a callId that is also used in response
|
||||
required uint32 callId = 3; // each rpc has a callId that is also used in response
|
||||
}
|
||||
|
||||
enum RpcStatusProto {
|
||||
SUCCESS = 0; // RPC succeeded
|
||||
ERROR = 1; // RPC Failed
|
||||
FATAL = 2; // Fatal error - connection is closed
|
||||
}
|
||||
|
||||
/**
|
||||
* Rpc Response Header
|
||||
* - If successfull then the Respose follows after this header
|
||||
* - length (4 byte int), followed by the response
|
||||
* - If error or fatal - the exception info follow
|
||||
* - length (4 byte int) Class name of exception - UTF-8 string
|
||||
* - length (4 byte int) Stacktrace - UTF-8 string
|
||||
* - if the strings are null then the length is -1
|
||||
* In case of Fatal error then the respose contains the Serverside's IPC version
|
||||
*/
|
||||
message RpcResponseHeaderProto {
|
||||
required uint32 callId = 1; // callId used in Request
|
||||
required RpcStatusProto status = 2;
|
||||
optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user