diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java index edf42444b8..d814052e43 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.oncrpc; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.List; @@ -26,6 +27,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.ByteToMessageDecoder; import org.apache.hadoop.classification.VisibleForTesting; @@ -172,15 +174,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) */ @ChannelHandler.Sharable private static final class RpcUdpResponseStage extends - ChannelInboundHandlerAdapter { + SimpleChannelInboundHandler { + public RpcUdpResponseStage() { + // do not auto release the RpcResponse message. + super(false); + } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - RpcResponse r = (RpcResponse) msg; - // TODO: check out https://github.com/netty/netty/issues/1282 for - // correct usage - ctx.channel().writeAndFlush(r.data()); + protected void channelRead0(ChannelHandlerContext ctx, + RpcResponse response) throws Exception { + ByteBuf buf = Unpooled.wrappedBuffer(response.data()); + ctx.writeAndFlush(new DatagramPacket( + buf, (InetSocketAddress) response.recipient())); } } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java index 7d1130b40f..953d74648d 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java @@ -117,15 +117,13 @@ void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress, .childOption(ChannelOption.SO_REUSEADDR, true) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { - private final IdleStateHandler idleStateHandler = new IdleStateHandler( - 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS); - @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(RpcUtil.constructRpcFrameDecoder(), - RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler, + RpcUtil.STAGE_RPC_MESSAGE_PARSER, new IdleStateHandler(0, 0, + idleTimeMilliSeconds, TimeUnit.MILLISECONDS), handler, RpcUtil.STAGE_RPC_TCP_RESPONSE); }}); diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java index 84fa71a269..35ab5cdc3d 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java @@ -23,8 +23,10 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.Arrays; import java.util.Map; +import org.apache.hadoop.oncrpc.RpcReply; import org.junit.Assert; import org.apache.hadoop.oncrpc.RpcCall; @@ -35,6 +37,8 @@ import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class TestPortmap { private static Portmap pm = new Portmap(); private static final int SHORT_TIMEOUT_MILLISECONDS = 10; @@ -92,6 +96,19 @@ public void testRegistration() throws IOException, InterruptedException, Illegal pm.getUdpServerLoAddress()); try { s.send(p); + + // verify that portmap server responds a UDF packet back to the client + byte[] receiveData = new byte[65535]; + DatagramPacket receivePacket = new DatagramPacket(receiveData, + receiveData.length); + s.setSoTimeout(2000); + s.receive(receivePacket); + + // verify that the registration is accepted. + XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0, + receivePacket.getLength())); + RpcReply reply = RpcReply.read(xdr); + assertEquals(reply.getState(), RpcReply.ReplyState.MSG_ACCEPTED); } finally { s.close(); }