diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java index ec96e61758..dcd01c023c 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java @@ -20,8 +20,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mount.MountdBase; -import org.apache.hadoop.oncrpc.RpcFrameDecoder; import org.apache.hadoop.oncrpc.RpcProgram; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.SimpleTcpServer; import org.apache.hadoop.oncrpc.SimpleTcpServerHandler; import org.apache.hadoop.portmap.PortmapMapping; @@ -68,7 +68,8 @@ public ChannelPipelineFactory getPipelineFactory() { return new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { - return Channels.pipeline(new RpcFrameDecoder(), + return Channels.pipeline( + RpcUtil.constructRpcFrameDecoder(), new SimpleTcpServerHandler(rpcProgram)); } }; diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java index 7186dd1359..04ebbbc39b 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java @@ -17,13 +17,65 @@ */ package org.apache.hadoop.oncrpc; -/** - * The XID in RPC call. It is used for starting with new seed after each reboot. - */ +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + public class RpcUtil { + /** + * The XID in RPC call. It is used for starting with new seed after each reboot. + */ private static int xid = (int) (System.currentTimeMillis() / 1000) << 12; public static int getNewXid(String caller) { return xid = ++xid + caller.hashCode(); } + + public static FrameDecoder constructRpcFrameDecoder() { + return new RpcFrameDecoder(); + } + + static class RpcFrameDecoder extends FrameDecoder { + public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class); + private ChannelBuffer currentFrame; + + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, + ChannelBuffer buf) { + + if (buf.readableBytes() < 4) + return null; + + buf.markReaderIndex(); + + byte[] fragmentHeader = new byte[4]; + buf.readBytes(fragmentHeader); + int length = XDR.fragmentSize(fragmentHeader); + boolean isLast = XDR.isLastFragment(fragmentHeader); + + if (buf.readableBytes() < length) { + buf.resetReaderIndex(); + return null; + } + + ChannelBuffer newFragment = buf.readSlice(length); + if (currentFrame == null) { + currentFrame = newFragment; + } else { + currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment); + } + + if (isLast) { + ChannelBuffer completeFrame = currentFrame; + currentFrame = null; + return completeFrame; + } else { + return null; + } + } + } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java index 287aa9fa8a..32e1b4b839 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java @@ -20,8 +20,6 @@ import java.net.InetSocketAddress; import java.util.concurrent.Executors; -import org.apache.hadoop.oncrpc.RpcFrameDecoder; -import org.apache.hadoop.oncrpc.XDR; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; @@ -55,7 +53,8 @@ protected ChannelPipelineFactory setPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { - return Channels.pipeline(new RpcFrameDecoder(), + return Channels.pipeline( + RpcUtil.constructRpcFrameDecoder(), new SimpleTcpClientHandler(request)); } }; diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java index e168ef406b..6f668a2106 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java @@ -57,7 +57,8 @@ public ChannelPipelineFactory getPipelineFactory() { return new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { - return Channels.pipeline(new RpcFrameDecoder(), + return Channels.pipeline( + RpcUtil.constructRpcFrameDecoder(), new SimpleTcpServerHandler(rpcProgram)); } }; diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java index 71cce18f58..04e2930f60 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java @@ -44,7 +44,7 @@ public SimpleTcpServerHandler(RpcProgram rpcProgram) { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); - XDR request = new XDR(buf.array()); + XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING); InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel() .getRemoteAddress()).getAddress(); diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java index 517b18d7d3..79a255b261 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java @@ -43,7 +43,7 @@ public SimpleUdpServerHandler(RpcProgram rpcProgram) { public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); - XDR request = new XDR(buf.array()); + XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING); InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress()) .getAddress(); diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java index 1472f6ebe6..df2b91f05f 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java @@ -46,7 +46,7 @@ public final class XDR { private ByteBuffer buf; - private enum State { + public enum State { READING, WRITING, } @@ -66,7 +66,7 @@ public XDR() { this(DEFAULT_INITIAL_CAPACITY); } - private XDR(ByteBuffer buf, State state) { + public XDR(ByteBuffer buf, State state) { this.buf = buf; this.state = state; } diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java index 189c24d603..0c306861b5 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java @@ -18,12 +18,14 @@ package org.apache.hadoop.oncrpc; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.net.InetAddress; import java.nio.ByteBuffer; +import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder; import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.VerifierNone; import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer; @@ -138,7 +140,7 @@ public void testMultipleFrames() { buf); assertTrue(channelBuffer != null); // Complete frame should have to total size 10+10=20 - assertTrue(channelBuffer.array().length == 20); + assertEquals(20, channelBuffer.readableBytes()); } @Test @@ -195,4 +197,4 @@ static XDR createGetportMount() { * static void testDump() { XDR xdr_out = new XDR(); * createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); } */ -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java index 48e148be8e..98836db0b2 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java @@ -17,10 +17,9 @@ */ package org.apache.hadoop.oncrpc; +import org.junit.Assert; import org.junit.Test; -import junit.framework.Assert; - public class TestXDR { private void serializeInt(int times) { XDR w = new XDR(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java index 7f3d8c58ee..2e8869e5a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java @@ -33,8 +33,8 @@ import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; import org.apache.hadoop.oncrpc.RegistrationClient; import org.apache.hadoop.oncrpc.RpcCall; -import org.apache.hadoop.oncrpc.RpcFrameDecoder; import org.apache.hadoop.oncrpc.RpcReply; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.SimpleTcpClient; import org.apache.hadoop.oncrpc.SimpleTcpClientHandler; import org.apache.hadoop.oncrpc.XDR; @@ -136,8 +136,9 @@ public WriteClient(String host, int port, XDR request, Boolean oneShot) { protected ChannelPipelineFactory setPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { - return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler( - request)); + return Channels.pipeline( + RpcUtil.constructRpcFrameDecoder(), + new WriteHandler(request)); } }; return this.pipelineFactory; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cd12395bcb..c21db10414 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -366,6 +366,9 @@ Release 2.1.1-beta - 2013-09-23 HDFS-5199 Add more debug trace for NFS READ and WRITE. (brandonli) + HDFS-5234 Move RpcFrameDecoder out of the public API. + (Haohui Mai via brandonli) + IMPROVEMENTS HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may