HADOOP-18569. NFS Gateway may release buffer too early (#5212)
(cherry picked from commit df4812df65
)
This commit is contained in:
parent
1cecf8ab70
commit
4de8791deb
@ -26,6 +26,7 @@
|
|||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
|
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
|
||||||
import org.apache.hadoop.oncrpc.security.VerifierNone;
|
import org.apache.hadoop.oncrpc.security.VerifierNone;
|
||||||
@ -163,8 +164,16 @@ public void stopDaemons() {}
|
|||||||
public void channelRead(ChannelHandlerContext ctx, Object msg)
|
public void channelRead(ChannelHandlerContext ctx, Object msg)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
RpcInfo info = (RpcInfo) msg;
|
RpcInfo info = (RpcInfo) msg;
|
||||||
RpcCall call = (RpcCall) info.header();
|
try {
|
||||||
|
channelRead(ctx, info);
|
||||||
|
} finally {
|
||||||
|
ReferenceCountUtil.release(info.data());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void channelRead(ChannelHandlerContext ctx, RpcInfo info)
|
||||||
|
throws Exception {
|
||||||
|
RpcCall call = (RpcCall) info.header();
|
||||||
SocketAddress remoteAddress = info.remoteAddress();
|
SocketAddress remoteAddress = info.remoteAddress();
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(program + " procedure #" + call.getProcedure());
|
LOG.trace(program + " procedure #" + call.getProcedure());
|
||||||
|
@ -129,16 +129,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
|
|||||||
RpcInfo info = null;
|
RpcInfo info = null;
|
||||||
try {
|
try {
|
||||||
RpcCall callHeader = RpcCall.read(in);
|
RpcCall callHeader = RpcCall.read(in);
|
||||||
ByteBuf dataBuffer = Unpooled.wrappedBuffer(in.buffer()
|
ByteBuf dataBuffer = buf.slice(b.position(), b.remaining());
|
||||||
.slice());
|
|
||||||
|
|
||||||
info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(),
|
info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(),
|
||||||
remoteAddress);
|
remoteAddress);
|
||||||
} catch (Exception exc) {
|
} catch (Exception exc) {
|
||||||
LOG.info("Malformed RPC request from " + remoteAddress);
|
LOG.info("Malformed RPC request from " + remoteAddress);
|
||||||
} finally {
|
} finally {
|
||||||
|
// only release buffer if it is not passed to downstream handler
|
||||||
|
if (info == null) {
|
||||||
buf.release();
|
buf.release();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (info != null) {
|
if (info != null) {
|
||||||
ctx.fireChannelRead(info);
|
ctx.fireChannelRead(info);
|
||||||
|
Loading…
Reference in New Issue
Block a user