From 4836f1ec37ad9204c5c39c7f9f29a234bed6cd9c Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Mon, 30 Jan 2023 09:17:04 -0800 Subject: [PATCH] HADOOP-18584. [NFS GW] Fix regression after netty4 migration. (#5252) Reviewed-by: Tsz-Wo Nicholas Sze (cherry picked from commit 9d47108b50fb0cd79ca48e82077e57572d8873e6) --- .../org/apache/hadoop/oncrpc/RpcUtil.java | 19 ++++++++++++------- .../org/apache/hadoop/portmap/Portmap.java | 6 ++---- .../apache/hadoop/portmap/TestPortmap.java | 17 +++++++++++++++++ 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java index 784e8c7961..92354f6b86 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.oncrpc; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.List; @@ -26,6 +27,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.ByteToMessageDecoder; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -172,15 +174,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) */ @ChannelHandler.Sharable private static final class RpcUdpResponseStage extends - ChannelInboundHandlerAdapter { + SimpleChannelInboundHandler { + public RpcUdpResponseStage() { + // do not auto release the RpcResponse message. + super(false); + } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - RpcResponse r = (RpcResponse) msg; - // TODO: check out https://github.com/netty/netty/issues/1282 for - // correct usage - ctx.channel().writeAndFlush(r.data()); + protected void channelRead0(ChannelHandlerContext ctx, + RpcResponse response) throws Exception { + ByteBuf buf = Unpooled.wrappedBuffer(response.data()); + ctx.writeAndFlush(new DatagramPacket( + buf, (InetSocketAddress) response.recipient())); } } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java index 1a8a305436..23c7977e30 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java @@ -117,15 +117,13 @@ void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress, .childOption(ChannelOption.SO_REUSEADDR, true) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { - private final IdleStateHandler idleStateHandler = new IdleStateHandler( - 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS); - @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(RpcUtil.constructRpcFrameDecoder(), - RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler, + RpcUtil.STAGE_RPC_MESSAGE_PARSER, new IdleStateHandler(0, 0, + idleTimeMilliSeconds, TimeUnit.MILLISECONDS), handler, RpcUtil.STAGE_RPC_TCP_RESPONSE); }}); diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java index 8ebf9d03c6..e2f7c03676 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java @@ -23,8 +23,10 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.Arrays; import java.util.Map; +import org.apache.hadoop.oncrpc.RpcReply; import org.junit.Assert; import org.apache.hadoop.oncrpc.RpcCall; @@ -36,6 +38,8 @@ import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class TestPortmap { private static Portmap pm = new Portmap(); private static final int SHORT_TIMEOUT_MILLISECONDS = 10; @@ -93,6 +97,19 @@ public void testRegistration() throws IOException, InterruptedException { pm.getUdpServerLoAddress()); try { s.send(p); + + // verify that portmap server responds a UDF packet back to the client + byte[] receiveData = new byte[65535]; + DatagramPacket receivePacket = new DatagramPacket(receiveData, + receiveData.length); + s.setSoTimeout(2000); + s.receive(receivePacket); + + // verify that the registration is accepted. + XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0, + receivePacket.getLength())); + RpcReply reply = RpcReply.read(xdr); + assertEquals(reply.getState(), RpcReply.ReplyState.MSG_ACCEPTED); } finally { s.close(); }