HDFS-5234 Move RpcFrameDecoder out of the public API. Contributed by Haohui Mai

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1525104 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-09-20 19:03:51 +00:00
parent 13420d01f2
commit d8d3d3eaed
11 changed files with 78 additions and 20 deletions

View File

@ -20,8 +20,8 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mount.MountdBase; import org.apache.hadoop.mount.MountdBase;
import org.apache.hadoop.oncrpc.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.SimpleTcpServer; import org.apache.hadoop.oncrpc.SimpleTcpServer;
import org.apache.hadoop.oncrpc.SimpleTcpServerHandler; import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
import org.apache.hadoop.portmap.PortmapMapping; import org.apache.hadoop.portmap.PortmapMapping;
@ -68,7 +68,8 @@ public ChannelPipelineFactory getPipelineFactory() {
return new ChannelPipelineFactory() { return new ChannelPipelineFactory() {
@Override @Override
public ChannelPipeline getPipeline() { public ChannelPipeline getPipeline() {
return Channels.pipeline(new RpcFrameDecoder(), return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpServerHandler(rpcProgram)); new SimpleTcpServerHandler(rpcProgram));
} }
}; };

View File

@ -17,13 +17,65 @@
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc;
/** import org.apache.commons.logging.Log;
* The XID in RPC call. It is used for starting with new seed after each reboot. import org.apache.commons.logging.LogFactory;
*/ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
public class RpcUtil { public class RpcUtil {
/**
* The XID in RPC call. It is used for starting with new seed after each reboot.
*/
private static int xid = (int) (System.currentTimeMillis() / 1000) << 12; private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
public static int getNewXid(String caller) { public static int getNewXid(String caller) {
return xid = ++xid + caller.hashCode(); return xid = ++xid + caller.hashCode();
} }
public static FrameDecoder constructRpcFrameDecoder() {
return new RpcFrameDecoder();
}
static class RpcFrameDecoder extends FrameDecoder {
public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
private ChannelBuffer currentFrame;
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buf) {
if (buf.readableBytes() < 4)
return null;
buf.markReaderIndex();
byte[] fragmentHeader = new byte[4];
buf.readBytes(fragmentHeader);
int length = XDR.fragmentSize(fragmentHeader);
boolean isLast = XDR.isLastFragment(fragmentHeader);
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return null;
}
ChannelBuffer newFragment = buf.readSlice(length);
if (currentFrame == null) {
currentFrame = newFragment;
} else {
currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment);
}
if (isLast) {
ChannelBuffer completeFrame = currentFrame;
currentFrame = null;
return completeFrame;
} else {
return null;
}
}
}
} }

View File

@ -20,8 +20,6 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.apache.hadoop.oncrpc.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.XDR;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
@ -55,7 +53,8 @@ protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() {
@Override @Override
public ChannelPipeline getPipeline() { public ChannelPipeline getPipeline() {
return Channels.pipeline(new RpcFrameDecoder(), return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpClientHandler(request)); new SimpleTcpClientHandler(request));
} }
}; };

View File

@ -57,7 +57,8 @@ public ChannelPipelineFactory getPipelineFactory() {
return new ChannelPipelineFactory() { return new ChannelPipelineFactory() {
@Override @Override
public ChannelPipeline getPipeline() { public ChannelPipeline getPipeline() {
return Channels.pipeline(new RpcFrameDecoder(), return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpServerHandler(rpcProgram)); new SimpleTcpServerHandler(rpcProgram));
} }
}; };

View File

@ -44,7 +44,7 @@ public SimpleTcpServerHandler(RpcProgram rpcProgram) {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage(); ChannelBuffer buf = (ChannelBuffer) e.getMessage();
XDR request = new XDR(buf.array()); XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel() InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
.getRemoteAddress()).getAddress(); .getRemoteAddress()).getAddress();

View File

@ -43,7 +43,7 @@ public SimpleUdpServerHandler(RpcProgram rpcProgram) {
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage(); ChannelBuffer buf = (ChannelBuffer) e.getMessage();
XDR request = new XDR(buf.array()); XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress()) InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
.getAddress(); .getAddress();

View File

@ -46,7 +46,7 @@ public final class XDR {
private ByteBuffer buf; private ByteBuffer buf;
private enum State { public enum State {
READING, WRITING, READING, WRITING,
} }
@ -66,7 +66,7 @@ public XDR() {
this(DEFAULT_INITIAL_CAPACITY); this(DEFAULT_INITIAL_CAPACITY);
} }
private XDR(ByteBuffer buf, State state) { public XDR(ByteBuffer buf, State state) {
this.buf = buf; this.buf = buf;
this.state = state; this.state = state;
} }

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.net.InetAddress; import java.net.InetAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer; import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
@ -138,7 +140,7 @@ public void testMultipleFrames() {
buf); buf);
assertTrue(channelBuffer != null); assertTrue(channelBuffer != null);
// Complete frame should have to total size 10+10=20 // Complete frame should have to total size 10+10=20
assertTrue(channelBuffer.array().length == 20); assertEquals(20, channelBuffer.readableBytes());
} }
@Test @Test

View File

@ -17,10 +17,9 @@
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import junit.framework.Assert;
public class TestXDR { public class TestXDR {
private void serializeInt(int times) { private void serializeInt(int times) {
XDR w = new XDR(); XDR w = new XDR();

View File

@ -33,8 +33,8 @@
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.oncrpc.RegistrationClient; import org.apache.hadoop.oncrpc.RegistrationClient;
import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.RpcReply; import org.apache.hadoop.oncrpc.RpcReply;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.SimpleTcpClient; import org.apache.hadoop.oncrpc.SimpleTcpClient;
import org.apache.hadoop.oncrpc.SimpleTcpClientHandler; import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
@ -136,8 +136,9 @@ public WriteClient(String host, int port, XDR request, Boolean oneShot) {
protected ChannelPipelineFactory setPipelineFactory() { protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() { public ChannelPipeline getPipeline() {
return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler( return Channels.pipeline(
request)); RpcUtil.constructRpcFrameDecoder(),
new WriteHandler(request));
} }
}; };
return this.pipelineFactory; return this.pipelineFactory;

View File

@ -366,6 +366,9 @@ Release 2.1.1-beta - 2013-09-23
HDFS-5199 Add more debug trace for NFS READ and WRITE. (brandonli) HDFS-5199 Add more debug trace for NFS READ and WRITE. (brandonli)
HDFS-5234 Move RpcFrameDecoder out of the public API.
(Haohui Mai via brandonli)
IMPROVEMENTS IMPROVEMENTS
HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may