HADOOP-11245. Update NFS gateway to use Netty4 (#2832) (#4997)

Reviewed-by: Tsz-Wo Nicholas Sze <szetszwo@apache.org>

Co-authored-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Ashutosh Gupta 2022-10-10 22:27:43 +01:00 committed by GitHub
parent 77cb778a44
commit 6847ec0647
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 467 additions and 353 deletions

View File

@ -90,7 +90,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-all</artifactId>
<scope>compile</scope>
</dependency>
<dependency>

View File

@ -41,6 +41,8 @@ abstract public class MountdBase {
private final RpcProgram rpcProgram;
private int udpBoundPort; // Will set after server starts
private int tcpBoundPort; // Will set after server starts
private SimpleUdpServer udpServer = null;
private SimpleTcpServer tcpServer = null;
public RpcProgram getRpcProgram() {
return rpcProgram;
@ -57,7 +59,7 @@ public MountdBase(RpcProgram program) throws IOException {
/* Start UDP server */
private void startUDPServer() {
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
udpServer = new SimpleUdpServer(rpcProgram.getPort(),
rpcProgram, 1);
rpcProgram.startDaemons();
try {
@ -76,7 +78,7 @@ private void startUDPServer() {
/* Start TCP server */
private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 1);
rpcProgram.startDaemons();
try {
@ -118,6 +120,14 @@ public void stop() {
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
tcpBoundPort = 0;
}
if (udpServer != null) {
udpServer.shutdown();
udpServer = null;
}
if (tcpServer != null) {
tcpServer.shutdown();
tcpServer = null;
}
}
/**

View File

@ -35,6 +35,7 @@ public abstract class Nfs3Base {
public static final Logger LOG = LoggerFactory.getLogger(Nfs3Base.class);
private final RpcProgram rpcProgram;
private int nfsBoundPort; // Will set after server starts
private SimpleTcpServer tcpServer = null;
public RpcProgram getRpcProgram() {
return rpcProgram;
@ -61,7 +62,7 @@ public void start(boolean register) {
}
private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 0);
rpcProgram.startDaemons();
try {
@ -84,6 +85,10 @@ public void stop() {
nfsBoundPort = 0;
}
rpcProgram.stopDaemons();
if (tcpServer != null) {
tcpServer.shutdown();
tcpServer = null;
}
}
/**
* Priority of the nfsd shutdown hook.

View File

@ -19,10 +19,9 @@
import java.util.Arrays;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,10 +57,10 @@ private boolean validMessageLength(int len) {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg; // Read reply
if (!validMessageLength(buf.readableBytes())) {
e.getChannel().close();
ctx.channel().close();
return;
}
@ -83,7 +82,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
RpcDeniedReply deniedReply = (RpcDeniedReply) reply;
handle(deniedReply);
}
e.getChannel().close(); // shutdown now that request is complete
ctx.channel().close(); // shutdown now that request is complete
}
private void handle(RpcDeniedReply deniedReply) {

View File

@ -19,9 +19,9 @@
import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
/**
* RpcInfo records all contextual information of an RPC message. It contains
@ -29,11 +29,11 @@
*/
public final class RpcInfo {
private final RpcMessage header;
private final ChannelBuffer data;
private final ByteBuf data;
private final Channel channel;
private final SocketAddress remoteAddress;
public RpcInfo(RpcMessage header, ChannelBuffer data,
public RpcInfo(RpcMessage header, ByteBuf data,
ChannelHandlerContext channelContext, Channel channel,
SocketAddress remoteAddress) {
this.header = header;
@ -46,7 +46,7 @@ public RpcMessage header() {
return header;
}
public ChannelBuffer data() {
public ByteBuf data() {
return data;
}

View File

@ -22,16 +22,15 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,7 +38,7 @@
* Class for writing RPC server programs based on RFC 1050. Extend this class
* and implement {@link #handleInternal} to handle the requests received.
*/
public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
public abstract class RpcProgram extends ChannelInboundHandlerAdapter {
static final Logger LOG = LoggerFactory.getLogger(RpcProgram.class);
public static final int RPCB_PORT = 111;
private final String program;
@ -161,9 +160,9 @@ public void startDaemons() {}
public void stopDaemons() {}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
RpcInfo info = (RpcInfo) e.getMessage();
RpcInfo info = (RpcInfo) msg;
RpcCall call = (RpcCall) info.header();
SocketAddress remoteAddress = info.remoteAddress();
@ -221,7 +220,7 @@ private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress,
out.writeInt(lowProgVersion);
out.writeInt(highProgVersion);
}
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(b, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);
@ -234,7 +233,7 @@ protected static void sendRejectedReply(RpcCall call,
RpcReply.ReplyState.MSG_DENIED,
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
reply.write(out);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);

View File

@ -19,27 +19,30 @@
import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.DefaultAddressedEnvelope;
/**
* RpcResponse encapsulates a response to a RPC request. It contains the data
* that is going to cross the wire, as well as the information of the remote
* peer.
*/
public class RpcResponse {
private final ChannelBuffer data;
private final SocketAddress remoteAddress;
public RpcResponse(ChannelBuffer data, SocketAddress remoteAddress) {
this.data = data;
this.remoteAddress = remoteAddress;
public class RpcResponse extends
DefaultAddressedEnvelope<ByteBuf, SocketAddress> {
public RpcResponse(ByteBuf message, SocketAddress recipient) {
super(message, recipient, null);
}
public ChannelBuffer data() {
return data;
public RpcResponse(ByteBuf message, SocketAddress recipient,
SocketAddress sender) {
super(message, recipient, sender);
}
public ByteBuf data() {
return this.content();
}
public SocketAddress remoteAddress() {
return remoteAddress;
return this.recipient();
}
}

View File

@ -17,16 +17,18 @@
*/
package org.apache.hadoop.oncrpc;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,16 +45,16 @@ public static int getNewXid(String caller) {
public static void sendRpcResponse(ChannelHandlerContext ctx,
RpcResponse response) {
Channels.fireMessageReceived(ctx, response);
ctx.fireChannelRead(response);
}
public static FrameDecoder constructRpcFrameDecoder() {
public static ByteToMessageDecoder constructRpcFrameDecoder() {
return new RpcFrameDecoder();
}
public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
public static final ChannelInboundHandlerAdapter STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
public static final ChannelInboundHandlerAdapter STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
public static final ChannelInboundHandlerAdapter STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
/**
* An RPC client can separate a RPC message into several frames (i.e.,
@ -62,44 +64,39 @@ public static FrameDecoder constructRpcFrameDecoder() {
* RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
* each RPC client.
*/
static class RpcFrameDecoder extends FrameDecoder {
static class RpcFrameDecoder extends ByteToMessageDecoder {
public static final Logger LOG =
LoggerFactory.getLogger(RpcFrameDecoder.class);
private ChannelBuffer currentFrame;
private volatile boolean isLast;
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buf) {
protected void decode(ChannelHandlerContext ctx, ByteBuf buf,
List<Object> out) {
if (buf.readableBytes() < 4)
return null;
if (buf.readableBytes() < 4) {
return;
}
buf.markReaderIndex();
byte[] fragmentHeader = new byte[4];
buf.readBytes(fragmentHeader);
int length = XDR.fragmentSize(fragmentHeader);
boolean isLast = XDR.isLastFragment(fragmentHeader);
isLast = XDR.isLastFragment(fragmentHeader);
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return null;
return;
}
ChannelBuffer newFragment = buf.readSlice(length);
if (currentFrame == null) {
currentFrame = newFragment;
} else {
currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment);
}
ByteBuf newFragment = buf.readSlice(length);
newFragment.retain();
out.add(newFragment);
}
if (isLast) {
ChannelBuffer completeFrame = currentFrame;
currentFrame = null;
return completeFrame;
} else {
return null;
}
@VisibleForTesting
public boolean isLast() {
return isLast;
}
}
@ -107,30 +104,44 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel,
* RpcMessageParserStage parses the network bytes and encapsulates the RPC
* request into a RpcInfo instance.
*/
static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
@ChannelHandler.Sharable
static final class RpcMessageParserStage extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory
.getLogger(RpcMessageParserStage.class);
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
ByteBuf buf;
SocketAddress remoteAddress;
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket)msg;
buf = packet.content();
remoteAddress = packet.sender();
} else {
buf = (ByteBuf) msg;
remoteAddress = ctx.channel().remoteAddress();
}
ByteBuffer b = buf.nioBuffer().asReadOnlyBuffer();
XDR in = new XDR(b, XDR.State.READING);
RpcInfo info = null;
try {
RpcCall callHeader = RpcCall.read(in);
ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
ByteBuf dataBuffer = Unpooled.wrappedBuffer(in.buffer()
.slice());
info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
e.getRemoteAddress());
info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(),
remoteAddress);
} catch (Exception exc) {
LOG.info("Malformed RPC request from " + e.getRemoteAddress());
LOG.info("Malformed RPC request from " + remoteAddress);
} finally {
buf.release();
}
if (info != null) {
Channels.fireMessageReceived(ctx, info);
ctx.fireChannelRead(info);
}
}
}
@ -139,16 +150,17 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
* RpcTcpResponseStage sends an RpcResponse across the wire with the
* appropriate fragment header.
*/
private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
@ChannelHandler.Sharable
private static class RpcTcpResponseStage extends ChannelInboundHandlerAdapter {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
RpcResponse r = (RpcResponse) e.getMessage();
RpcResponse r = (RpcResponse) msg;
byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
e.getChannel().write(d);
ByteBuf header = Unpooled.wrappedBuffer(fragmentHeader);
ByteBuf d = Unpooled.wrappedBuffer(header, r.data());
ctx.channel().writeAndFlush(d);
}
}
@ -156,14 +168,17 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
* RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
* require a fragment header.
*/
@ChannelHandler.Sharable
private static final class RpcUdpResponseStage extends
SimpleChannelUpstreamHandler {
ChannelInboundHandlerAdapter {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
RpcResponse r = (RpcResponse) e.getMessage();
e.getChannel().write(r.data(), r.remoteAddress());
RpcResponse r = (RpcResponse) msg;
// TODO: check out https://github.com/netty/netty/issues/1282 for
// correct usage
ctx.channel().writeAndFlush(r.data());
}
}
}

View File

@ -18,15 +18,16 @@
package org.apache.hadoop.oncrpc;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* A simple TCP based RPC client which just sends a request to a server.
@ -35,8 +36,9 @@ public class SimpleTcpClient {
protected final String host;
protected final int port;
protected final XDR request;
protected ChannelPipelineFactory pipelineFactory;
protected final boolean oneShot;
private NioEventLoopGroup workerGroup;
private ChannelFuture future;
public SimpleTcpClient(String host, int port, XDR request) {
this(host,port, request, true);
@ -48,40 +50,54 @@ public SimpleTcpClient(String host, int port, XDR request, Boolean oneShot) {
this.request = request;
this.oneShot = oneShot;
}
protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() {
protected ChannelInitializer<SocketChannel> setChannelHandler() {
return new ChannelInitializer<SocketChannel>() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(
RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpClientHandler(request));
new SimpleTcpClientHandler(request)
);
}
};
return this.pipelineFactory;
}
@VisibleForTesting
public void run() {
// Configure the client.
ChannelFactory factory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1);
ClientBootstrap bootstrap = new ClientBootstrap(factory);
workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(workerGroup)
.channel(NioSocketChannel.class);
// Set up the pipeline factory.
bootstrap.setPipelineFactory(setPipelineFactory());
try {
future = bootstrap.handler(setChannelHandler())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.connect(new InetSocketAddress(host, port)).sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (oneShot) {
stop();
}
}
}
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
if (oneShot) {
// Wait until the connection is closed or the connection attempt fails.
future.getChannel().getCloseFuture().awaitUninterruptibly();
public void stop() {
try {
if (future != null) {
// Wait until the connection is closed or the connection attempt fails.
future.channel().closeFuture().sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
workerGroup.shutdownGracefully();
}
}
}

View File

@ -17,19 +17,19 @@
*/
package org.apache.hadoop.oncrpc;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A simple TCP based RPC client handler used by {@link SimpleTcpServer}.
*/
public class SimpleTcpClientHandler extends SimpleChannelHandler {
public class SimpleTcpClientHandler extends ChannelInboundHandlerAdapter {
public static final Logger LOG =
LoggerFactory.getLogger(SimpleTcpClient.class);
protected final XDR request;
@ -39,13 +39,13 @@ public SimpleTcpClientHandler(XDR request) {
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Send the request
if (LOG.isDebugEnabled()) {
LOG.debug("sending PRC request");
}
ChannelBuffer outBuf = XDR.writeMessageTcp(request, true);
e.getChannel().write(outBuf);
ByteBuf outBuf = XDR.writeMessageTcp(request, true);
ctx.channel().writeAndFlush(outBuf);
}
/**
@ -53,13 +53,13 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
* more interaction with the server.
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
e.getChannel().close();
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
LOG.warn("Unexpected exception from downstream: ", e.getCause());
e.getChannel().close();
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.warn("Unexpected exception from downstream: ", cause.getCause());
ctx.channel().close();
}
}

View File

@ -20,14 +20,17 @@
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,9 +42,11 @@ public class SimpleTcpServer {
LoggerFactory.getLogger(SimpleTcpServer.class);
protected final int port;
protected int boundPort = -1; // Will be set after server starts
protected final SimpleChannelUpstreamHandler rpcProgram;
protected final ChannelInboundHandlerAdapter rpcProgram;
private ServerBootstrap server;
private Channel ch;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
/** The maximum number of I/O worker threads */
protected final int workerCount;
@ -57,37 +62,32 @@ public SimpleTcpServer(int port, RpcProgram program, int workercount) {
this.workerCount = workercount;
}
public void run() {
public void run() throws InterruptedException {
// Configure the Server.
ChannelFactory factory;
if (workerCount == 0) {
// Use default workers: 2 * the number of available processors
factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
} else {
factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
workerCount);
}
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(workerCount, Executors.newCachedThreadPool());
server = new ServerBootstrap(factory);
server.setPipelineFactory(new ChannelPipelineFactory() {
server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(RpcUtil.constructRpcFrameDecoder(),
RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
RpcUtil.STAGE_RPC_TCP_RESPONSE);
}
});
server.setOption("child.tcpNoDelay", true);
server.setOption("child.keepAlive", true);
server.setOption("child.reuseAddress", true);
server.setOption("reuseAddress", true);
}})
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_REUSEADDR, true);
// Listen to TCP port
ch = server.bind(new InetSocketAddress(port));
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
ChannelFuture f = server.bind(new InetSocketAddress(port)).sync();
ch = f.channel();
InetSocketAddress socketAddr = (InetSocketAddress) ch.localAddress();
boundPort = socketAddr.getPort();
LOG.info("Started listening to TCP requests at port " + boundPort + " for "
@ -102,9 +102,17 @@ public int getBoundPort() {
public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
ch = null;
}
if (server != null) {
server.releaseExternalResources();
if (workerGroup != null) {
workerGroup.shutdownGracefully();
workerGroup = null;
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
bossGroup = null;
}
}
}

View File

@ -20,12 +20,16 @@
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,36 +43,45 @@ public class SimpleUdpServer {
private final int RECEIVE_BUFFER_SIZE = 65536;
protected final int port;
protected final SimpleChannelUpstreamHandler rpcProgram;
protected final ChannelInboundHandlerAdapter rpcProgram;
protected final int workerCount;
protected int boundPort = -1; // Will be set after server starts
private ConnectionlessBootstrap server;
private Bootstrap server;
private Channel ch;
private EventLoopGroup workerGroup;
public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program,
public SimpleUdpServer(int port, ChannelInboundHandlerAdapter program,
int workerCount) {
this.port = port;
this.rpcProgram = program;
this.workerCount = workerCount;
}
public void run() {
// Configure the client.
DatagramChannelFactory f = new NioDatagramChannelFactory(
Executors.newCachedThreadPool(), workerCount);
public void run() throws InterruptedException {
workerGroup = new NioEventLoopGroup(workerCount, Executors.newCachedThreadPool());
server = new ConnectionlessBootstrap(f);
server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE));
server.setOption("broadcast", "false");
server.setOption("sendBufferSize", SEND_BUFFER_SIZE);
server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
server.setOption("reuseAddress", true);
server = new Bootstrap();
server.group(workerGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_SNDBUF, SEND_BUFFER_SIZE)
.option(ChannelOption.SO_RCVBUF, RECEIVE_BUFFER_SIZE)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override protected void initChannel(NioDatagramChannel ch)
throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(
RpcUtil.STAGE_RPC_MESSAGE_PARSER,
rpcProgram,
RpcUtil.STAGE_RPC_UDP_RESPONSE);
}
});
// Listen to the UDP port
ch = server.bind(new InetSocketAddress(port));
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
ChannelFuture f = server.bind(new InetSocketAddress(port)).sync();
ch = f.channel();
InetSocketAddress socketAddr = (InetSocketAddress) ch.localAddress();
boundPort = socketAddr.getPort();
LOG.info("Started listening to UDP requests at port " + boundPort + " for "
@ -83,9 +96,11 @@ public int getBoundPort() {
public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
ch = null;
}
if (server != null) {
server.releaseExternalResources();
if (workerGroup != null) {
workerGroup.shutdownGracefully();
workerGroup = null;
}
}
}

View File

@ -20,8 +20,8 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@ -242,7 +242,7 @@ static byte[] recordMark(int size, boolean last) {
* @param last specifies last request or not
* @return TCP buffer
*/
public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
public static ByteBuf writeMessageTcp(XDR request, boolean last) {
Preconditions.checkState(request.state == XDR.State.WRITING);
ByteBuffer b = request.buf.duplicate();
b.flip();
@ -250,7 +250,7 @@ public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader);
// TODO: Investigate whether making a copy of the buffer is necessary.
return ChannelBuffers.copiedBuffer(headerBuf, b);
return Unpooled.wrappedBuffer(headerBuf, b);
}
/**
@ -258,10 +258,10 @@ public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
* @param response XDR response
* @return UDP buffer
*/
public static ChannelBuffer writeMessageUdp(XDR response) {
public static ByteBuf writeMessageUdp(XDR response) {
Preconditions.checkState(response.state == XDR.State.READING);
// TODO: Investigate whether making a copy of the buffer is necessary.
return ChannelBuffers.copiedBuffer(response.buf);
return Unpooled.copiedBuffer(response.buf);
}
public static int fragmentSize(byte[] mark) {

View File

@ -22,21 +22,27 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.util.StringUtils;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@ -49,11 +55,17 @@ final class Portmap {
private static final Logger LOG = LoggerFactory.getLogger(Portmap.class);
private static final int DEFAULT_IDLE_TIME_MILLISECONDS = 5000;
private ConnectionlessBootstrap udpServer;
private Bootstrap udpServer;
private ServerBootstrap tcpServer;
private ChannelGroup allChannels = new DefaultChannelGroup();
private ChannelGroup allChannels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);
private Channel udpChannel;
private Channel tcpChannel;
EventLoopGroup bossGroup;
EventLoopGroup workerGroup;
EventLoopGroup udpGroup;
private final RpcProgramPortmap handler = new RpcProgramPortmap(allChannels);
public static void main(String[] args) {
@ -73,18 +85,19 @@ public static void main(String[] args) {
void shutdown() {
allChannels.close().awaitUninterruptibly();
tcpServer.releaseExternalResources();
udpServer.releaseExternalResources();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
udpGroup.shutdownGracefully();
}
@VisibleForTesting
SocketAddress getTcpServerLocalAddress() {
return tcpChannel.getLocalAddress();
return tcpChannel.localAddress();
}
@VisibleForTesting
SocketAddress getUdpServerLoAddress() {
return udpChannel.getLocalAddress();
return udpChannel.localAddress();
}
@VisibleForTesting
@ -93,38 +106,55 @@ RpcProgramPortmap getHandler() {
}
void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress,
final SocketAddress udpAddress) {
final SocketAddress udpAddress) throws InterruptedException {
tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
tcpServer.setPipelineFactory(new ChannelPipelineFactory() {
private final HashedWheelTimer timer = new HashedWheelTimer();
private final IdleStateHandler idleStateHandler = new IdleStateHandler(
timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
RpcUtil.STAGE_RPC_TCP_RESPONSE);
}
});
tcpServer.setOption("reuseAddress", true);
tcpServer.setOption("child.reuseAddress", true);
tcpServer = new ServerBootstrap();
tcpServer.group(bossGroup, workerGroup)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
private final IdleStateHandler idleStateHandler = new IdleStateHandler(
0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
Executors.newCachedThreadPool()));
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
handler, RpcUtil.STAGE_RPC_UDP_RESPONSE));
udpServer.setOption("reuseAddress", true);
p.addLast(RpcUtil.constructRpcFrameDecoder(),
RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
RpcUtil.STAGE_RPC_TCP_RESPONSE);
}});
udpGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
udpServer = new Bootstrap();
udpServer.group(udpGroup)
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override protected void initChannel(NioDatagramChannel ch)
throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(
new LoggingHandler(LogLevel.DEBUG),
RpcUtil.STAGE_RPC_MESSAGE_PARSER, handler, RpcUtil.STAGE_RPC_UDP_RESPONSE);
}
})
.option(ChannelOption.SO_REUSEADDR, true);
ChannelFuture tcpChannelFuture = null;
tcpChannelFuture = tcpServer.bind(tcpAddress);
ChannelFuture udpChannelFuture = udpServer.bind(udpAddress);
tcpChannel = tcpChannelFuture.sync().channel();
udpChannel = udpChannelFuture.sync().channel();
tcpChannel = tcpServer.bind(tcpAddress);
udpChannel = udpServer.bind(udpAddress);
allChannels.add(tcpChannel);
allChannels.add(udpChannel);
LOG.info("Portmap server started at tcp://" + tcpChannel.getLocalAddress()
+ ", udp://" + udpChannel.getLocalAddress());
LOG.info("Portmap server started at tcp://" + tcpChannel.localAddress()
+ ", udp://" + udpChannel.localAddress());
}
}

View File

@ -19,6 +19,14 @@
import java.util.concurrent.ConcurrentHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcInfo;
@ -27,20 +35,12 @@
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.timeout.IdleState;
import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
import org.jboss.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
@ChannelHandler.Sharable
final class RpcProgramPortmap extends IdleStateHandler {
static final int PROGRAM = 100000;
static final int VERSION = 2;
@ -60,6 +60,8 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
private final ChannelGroup allChannels;
RpcProgramPortmap(ChannelGroup allChannels) {
super(1, 1, 1);
// FIXME: set default idle timeout 1 second.
this.allChannels = allChannels;
PortmapMapping m = new PortmapMapping(PROGRAM, VERSION,
PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT);
@ -151,14 +153,14 @@ private XDR dump(int xid, XDR in, XDR out) {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
RpcInfo info = (RpcInfo) e.getMessage();
RpcInfo info = (RpcInfo) msg;
RpcCall rpcCall = (RpcCall) info.header();
final int portmapProc = rpcCall.getProcedure();
int xid = rpcCall.getXid();
XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(),
XDR in = new XDR(info.data().nioBuffer().asReadOnlyBuffer(),
XDR.State.READING);
XDR out = new XDR();
@ -181,29 +183,29 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
reply.write(out);
}
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
allChannels.add(e.getChannel());
allChannels.add(ctx.channel());
}
@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
throws Exception {
if (e.getState() == IdleState.ALL_IDLE) {
e.getChannel().close();
if (e.state() == IdleState.ALL_IDLE) {
ctx.channel().close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
LOG.warn("Encountered ", e.getCause());
e.getChannel().close();
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) {
LOG.warn("Encountered ", t);
ctx.channel().close();
}
}

View File

@ -22,19 +22,19 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.test.GenericTestUtils;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.event.Level;
@ -55,6 +55,7 @@ static void testRequest(XDR request, int serverPort) {
tcpClient.run();
}
@ChannelHandler.Sharable
static class TestRpcProgram extends RpcProgram {
protected TestRpcProgram(String program, String host, int port,
@ -83,7 +84,7 @@ protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
new VerifierNone());
XDR out = new XDR();
reply.write(out);
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap().buffer());
RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
}
@ -99,13 +100,14 @@ public void testSingleFrame() {
RpcFrameDecoder decoder = new RpcFrameDecoder();
// Test "Length field is not received yet"
ByteBuffer buffer = ByteBuffer.allocate(1);
ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
buf);
assertTrue(channelBuffer == null);
ByteBuf buf = Unpooled.directBuffer(1);
List<Object> outputBufs = new ArrayList<>();
decoder.decode(
Mockito.mock(ChannelHandlerContext.class), buf,
outputBufs);
assertTrue(outputBufs.isEmpty());
decoder = new RpcFrameDecoder();
// Test all bytes are not received yet
byte[] fragment = new byte[4 + 9];
fragment[0] = (byte) (1 << 7); // final fragment
@ -114,15 +116,16 @@ public void testSingleFrame() {
fragment[3] = (byte) 10; // fragment size = 10 bytes
assertTrue(XDR.isLastFragment(fragment));
assertTrue(XDR.fragmentSize(fragment)==10);
buf.release();
buffer = ByteBuffer.allocate(4 + 9);
buffer.put(fragment);
buffer.flip();
buf = new ByteBufferBackedChannelBuffer(buffer);
channelBuffer = (ChannelBuffer) decoder.decode(
Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
buf);
assertTrue(channelBuffer == null);
buf = Unpooled.directBuffer(4 + 9);
buf.writeBytes(fragment);
outputBufs = new ArrayList<>();
decoder.decode(
Mockito.mock(ChannelHandlerContext.class), buf,
outputBufs);
assertTrue(decoder.isLast());
buf.release();
}
@Test
@ -137,16 +140,15 @@ public void testMultipleFrames() {
fragment1[3] = (byte) 10; // fragment size = 10 bytes
assertFalse(XDR.isLastFragment(fragment1));
assertTrue(XDR.fragmentSize(fragment1)==10);
List<Object> outputBufs = new ArrayList<>();
// decoder should wait for the final fragment
ByteBuffer buffer = ByteBuffer.allocate(4 + 10);
buffer.put(fragment1);
buffer.flip();
ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
buf);
assertTrue(channelBuffer == null);
ByteBuf buf = Unpooled.directBuffer(4 + 10, 4 + 10);
buf.writeBytes(fragment1);
decoder.decode(
Mockito.mock(ChannelHandlerContext.class), buf,
outputBufs);
byte[] fragment2 = new byte[4 + 10];
fragment2[0] = (byte) (1 << 7); // final fragment
@ -155,21 +157,22 @@ public void testMultipleFrames() {
fragment2[3] = (byte) 10; // fragment size = 10 bytes
assertTrue(XDR.isLastFragment(fragment2));
assertTrue(XDR.fragmentSize(fragment2)==10);
buf.release();
buffer = ByteBuffer.allocate(4 + 10);
buffer.put(fragment2);
buffer.flip();
buf = new ByteBufferBackedChannelBuffer(buffer);
channelBuffer = (ChannelBuffer) decoder.decode(
Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
buf);
assertTrue(channelBuffer != null);
// Complete frame should have to total size 10+10=20
assertEquals(20, channelBuffer.readableBytes());
buf = Unpooled.directBuffer(4 + 10, 4 + 10);
buf.writeBytes(fragment2);
decoder.decode(
Mockito.mock(ChannelHandlerContext.class), buf,
outputBufs);
// Expect two completed frames each 10 bytes
decoder.isLast();
assertEquals(2, outputBufs.size());
outputBufs.forEach(b -> assertEquals(((ByteBuf)b).readableBytes(), 10));
buf.release();
}
@Test
public void testFrames() {
public void testFrames() throws InterruptedException {
int serverPort = startRpcServer(true);
XDR xdrOut = createGetportMount();
@ -187,7 +190,7 @@ public void testFrames() {
}
@Test
public void testUnprivilegedPort() {
public void testUnprivilegedPort() throws InterruptedException {
// Don't allow connections from unprivileged ports. Given that this test is
// presumably not being run by root, this will be the case.
int serverPort = startRpcServer(false);
@ -218,23 +221,28 @@ public void testUnprivilegedPort() {
assertEquals(requestSize, resultSize);
}
private static int startRpcServer(boolean allowInsecurePorts) {
private static int startRpcServer(boolean allowInsecurePorts)
throws InterruptedException {
Random rand = new Random();
int serverPort = 30000 + rand.nextInt(10000);
int retries = 10; // A few retries in case initial choice is in use.
while (true) {
SimpleTcpServer tcpServer = null;
try {
RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
"localhost", serverPort, 100000, 1, 2, allowInsecurePorts);
SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1);
tcpServer = new SimpleTcpServer(serverPort, program, 1);
tcpServer.run();
break; // Successfully bound a port, break out.
} catch (ChannelException ce) {
} catch (InterruptedException | ChannelException e) {
if (tcpServer != null) {
tcpServer.shutdown();
}
if (retries-- > 0) {
serverPort += rand.nextInt(20); // Port in use? Try another.
} else {
throw ce; // Out of retries.
throw e; // Out of retries.
}
}
}

View File

@ -43,7 +43,7 @@ public class TestPortmap {
private int xid;
@BeforeClass
public static void setup() {
public static void setup() throws InterruptedException {
pm.start(SHORT_TIMEOUT_MILLISECONDS, new InetSocketAddress("localhost", 0),
new InetSocketAddress("localhost", 0));
}

View File

@ -47,7 +47,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-all</artifactId>
<scope>compile</scope>
</dependency>
<dependency>

View File

@ -26,6 +26,10 @@
import java.util.List;
import java.util.HashMap;
import io.netty.channel.ChannelHandler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@ -51,15 +55,13 @@
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* RPC program corresponding to mountd daemon. See {@link Mountd}.
*/
@ChannelHandler.Sharable
public class RpcProgramMountd extends RpcProgram implements MountInterface {
private static final Logger LOG =
LoggerFactory.getLogger(RpcProgramMountd.class);
@ -262,8 +264,8 @@ RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone())
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
out);
}
ChannelBuffer buf =
ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
ByteBuf buf =
Unpooled.wrappedBuffer(out.asReadOnlyWrap().buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
}

View File

@ -22,6 +22,8 @@
import java.net.URI;
import java.nio.file.FileSystemException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants;
@ -39,8 +41,6 @@
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.security.IdMappingServiceProvider;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
/**
* Utility/helper methods related to NFS
@ -147,16 +147,16 @@ public static void writeChannel(Channel channel, XDR out, int xid) {
if (RpcProgramNfs3.LOG.isDebugEnabled()) {
RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
}
ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
channel.write(outBuf);
ByteBuf outBuf = XDR.writeMessageTcp(out, true);
channel.writeAndFlush(outBuf);
}
public static void writeChannelCommit(Channel channel, XDR out, int xid) {
if (RpcProgramNfs3.LOG.isDebugEnabled()) {
RpcProgramNfs3.LOG.debug("Commit done:" + xid);
}
ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
channel.write(outBuf);
ByteBuf outBuf = XDR.writeMessageTcp(out, true);
channel.writeAndFlush(outBuf);
}
private static boolean isSet(int access, int bits) {

View File

@ -31,6 +31,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.channel.Channel;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@ -55,7 +56,6 @@
import org.apache.hadoop.security.IdMappingServiceProvider;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.jboss.netty.channel.Channel;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;

View File

@ -28,6 +28,11 @@
import java.nio.charset.Charset;
import java.util.EnumSet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@ -129,10 +134,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@ -141,6 +142,7 @@
/**
* RPC program corresponding to nfs daemon. See {@link Nfs3}.
*/
@ChannelHandler.Sharable
public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public static final int DEFAULT_UMASK = 0022;
public static final FsPermission umask = new FsPermission(
@ -2180,7 +2182,7 @@ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
rdr.write(reply);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
ByteBuf buf = Unpooled.wrappedBuffer(reply.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
@ -2291,7 +2293,7 @@ RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
}
// TODO: currently we just return VerifierNone
out = response.serialize(out, xid, new VerifierNone());
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());

View File

@ -22,12 +22,12 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.jboss.netty.channel.Channel;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.EnumSet;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -43,7 +44,6 @@
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.security.IdMappingServiceProvider;
import org.jboss.netty.channel.Channel;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;

View File

@ -21,6 +21,12 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
@ -42,13 +48,6 @@
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
public class TestOutOfOrderWrite {
public final static Logger LOG =
@ -100,9 +99,9 @@ public WriteHandler(XDR request) {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Get handle from create response
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
ByteBuf buf = (ByteBuf) msg;
XDR rsp = new XDR(buf.array());
if (rsp.getBytes().length == 0) {
LOG.info("rsp length is zero, why?");
@ -125,7 +124,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
rsp.readBoolean(); // value follow
handle = new FileHandle();
handle.deserialize(rsp);
channel = e.getChannel();
channel = ctx.channel();
}
}
@ -136,16 +135,17 @@ public WriteClient(String host, int port, XDR request, Boolean oneShot) {
}
@Override
protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() {
protected ChannelInitializer<SocketChannel> setChannelHandler() {
return new ChannelInitializer<SocketChannel>() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(
RpcUtil.constructRpcFrameDecoder(),
new WriteHandler(request));
new WriteHandler(request)
);
}
};
return this.pipelineFactory;
}
}

View File

@ -28,6 +28,7 @@
import java.nio.ByteBuffer;
import java.util.EnumSet;
import io.netty.channel.Channel;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -92,7 +93,6 @@
import org.apache.hadoop.security.IdMappingConstant;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.jboss.netty.channel.Channel;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;

View File

@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.concurrent.ConcurrentNavigableMap;
import io.netty.channel.Channel;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -52,7 +53,6 @@
import org.apache.hadoop.security.ShellBasedIdMapping;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.jboss.netty.channel.Channel;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;