From 8ef140d38b16a18050e9e7901e8aadd424aef20f Mon Sep 17 00:00:00 2001 From: Brandon Li Date: Fri, 7 Jun 2013 21:45:06 +0000 Subject: [PATCH] HADOOP-9509. Implement ONCRPC and XDR. Contributed by Brandon Li git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1490845 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + hadoop-common-project/hadoop-nfs/README.txt | 10 + hadoop-common-project/hadoop-nfs/pom.xml | 98 ++++ .../hadoop/oncrpc/RegistrationClient.java | 104 +++++ .../hadoop/oncrpc/RpcAcceptedReply.java | 89 ++++ .../org/apache/hadoop/oncrpc/RpcAuthInfo.java | 81 ++++ .../org/apache/hadoop/oncrpc/RpcAuthSys.java | 51 +++ .../org/apache/hadoop/oncrpc/RpcCall.java | 113 +++++ .../apache/hadoop/oncrpc/RpcCallCache.java | 170 +++++++ .../apache/hadoop/oncrpc/RpcDeniedReply.java | 80 ++++ .../apache/hadoop/oncrpc/RpcFrameDecoder.java | 88 ++++ .../org/apache/hadoop/oncrpc/RpcMessage.java | 52 +++ .../org/apache/hadoop/oncrpc/RpcProgram.java | 198 +++++++++ .../org/apache/hadoop/oncrpc/RpcReply.java | 67 +++ .../org/apache/hadoop/oncrpc/RpcUtil.java | 29 ++ .../apache/hadoop/oncrpc/SimpleTcpClient.java | 88 ++++ .../hadoop/oncrpc/SimpleTcpClientHandler.java | 64 +++ .../apache/hadoop/oncrpc/SimpleTcpServer.java | 90 ++++ .../hadoop/oncrpc/SimpleTcpServerHandler.java | 63 +++ .../apache/hadoop/oncrpc/SimpleUdpClient.java | 70 +++ .../apache/hadoop/oncrpc/SimpleUdpServer.java | 76 ++++ .../hadoop/oncrpc/SimpleUdpServerHandler.java | 60 +++ .../java/org/apache/hadoop/oncrpc/XDR.java | 418 ++++++++++++++++++ .../org/apache/hadoop/portmap/Portmap.java | 58 +++ .../hadoop/portmap/PortmapInterface.java | 97 ++++ .../apache/hadoop/portmap/PortmapMapping.java | 70 +++ .../apache/hadoop/portmap/PortmapRequest.java | 46 ++ .../hadoop/portmap/PortmapResponse.java | 61 +++ .../hadoop/portmap/RpcProgramPortmap.java | 167 +++++++ .../hadoop/oncrpc/TestFrameDecoder.java | 194 ++++++++ .../hadoop/oncrpc/TestRpcAcceptedReply.java | 58 +++ .../apache/hadoop/oncrpc/TestRpcAuthInfo.java | 53 +++ .../apache/hadoop/oncrpc/TestRpcAuthSys.java | 45 ++ .../org/apache/hadoop/oncrpc/TestRpcCall.java | 59 +++ .../hadoop/oncrpc/TestRpcCallCache.java | 135 ++++++ .../hadoop/oncrpc/TestRpcDeniedReply.java | 51 +++ .../apache/hadoop/oncrpc/TestRpcMessage.java | 57 +++ .../apache/hadoop/oncrpc/TestRpcReply.java | 49 ++ .../org/apache/hadoop/oncrpc/TestXDR.java | 39 ++ hadoop-common-project/pom.xml | 1 + 40 files changed, 3401 insertions(+) create mode 100644 hadoop-common-project/hadoop-nfs/README.txt create mode 100644 hadoop-common-project/hadoop-nfs/pom.xml create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java create mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java create mode 100644 hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e06a273cf2..17e5e595ee 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -12,6 +12,8 @@ Trunk (Unreleased) HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child hadoop client processes. (Yu Gao via llu) + HADOOP-9509. Implement ONCRPC and XDR. (brandonli) + IMPROVEMENTS HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution diff --git a/hadoop-common-project/hadoop-nfs/README.txt b/hadoop-common-project/hadoop-nfs/README.txt new file mode 100644 index 0000000000..bec500aaee --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/README.txt @@ -0,0 +1,10 @@ +Hadoop NFS + +Hadoop NFS is a Java library for building NFS gateway. It has +the following components: + +- ONCRPC: This a implementation of ONCRPC(RFC-5531) and XDR(RFC-4506). +- Mount: This an interface implementation of MOUNT protocol (RFC-1813). +- Portmap: This is a implementation of Binding protocol(RFC-1833). +- NFSv3: This is an interface implementation of NFSv3 protocol(RFC-1813). + diff --git a/hadoop-common-project/hadoop-nfs/pom.xml b/hadoop-common-project/hadoop-nfs/pom.xml new file mode 100644 index 0000000000..5b87305208 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/pom.xml @@ -0,0 +1,98 @@ + + + + 4.0.0 + + org.apache.hadoop + hadoop-project + 3.0.0-SNAPSHOT + ../../hadoop-project + + org.apache.hadoop + hadoop-nfs + 3.0.0-SNAPSHOT + jar + + Apache Hadoop NFS + Apache Hadoop NFS library + + + yyyyMMdd + LOCALHOST + + + + + + org.apache.hadoop + hadoop-annotations + provided + + + org.apache.hadoop + hadoop-common + provided + + + junit + junit + 4.8.2 + + + org.mockito + mockito-all + 1.8.5 + + + commons-logging + commons-logging + compile + + + javax.servlet + servlet-api + provided + + + org.slf4j + slf4j-api + compile + + + log4j + log4j + runtime + + + org.slf4j + slf4j-log4j12 + runtime + + + io.netty + netty + 3.6.2.Final + compile + + + com.google.guava + guava + 11.0.2 + + + diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java new file mode 100644 index 0000000000..7ba37c9810 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; + +/** + * A simple client that registers an RPC program with portmap. + */ +public class RegistrationClient extends SimpleTcpClient { + public static final Log LOG = LogFactory.getLog(RegistrationClient.class); + + public RegistrationClient(String host, int port, XDR request) { + super(host, port, request); + } + + /** + * Handler to handle response from the server. + */ + static class RegistrationClientHandler extends SimpleTcpClientHandler { + public RegistrationClientHandler(XDR request) { + super(request); + } + + private boolean validMessageLength(int len) { + // 28 bytes is the minimal success response size (portmapV2) + if (len < 28) { + if (LOG.isDebugEnabled()) { + LOG.debug("Portmap mapping registration failed," + + " the response size is less than 28 bytes:" + len); + } + return false; + } + return true; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply + if (!validMessageLength(buf.readableBytes())) { + e.getChannel().close(); + return; + } + + // handling fragment header for TCP, 4 bytes. + byte[] fragmentHeader = Arrays.copyOfRange(buf.array(), 0, 4); + int fragmentSize = XDR.fragmentSize(fragmentHeader); + boolean isLast = XDR.isLastFragment(fragmentHeader); + assert (fragmentSize == 28 && isLast == true); + + XDR xdr = new XDR(); + xdr.writeFixedOpaque(Arrays.copyOfRange(buf.array(), 4, + buf.readableBytes())); + + RpcReply reply = RpcReply.read(xdr); + if (reply.getState() == RpcReply.ReplyState.MSG_ACCEPTED) { + RpcAcceptedReply acceptedReply = (RpcAcceptedReply) reply; + handle(acceptedReply, xdr); + } else { + RpcDeniedReply deniedReply = (RpcDeniedReply) reply; + handle(deniedReply); + } + e.getChannel().close(); // shutdown now that request is complete + } + + private void handle(RpcDeniedReply deniedReply) { + LOG.warn("Portmap mapping registration request was denied , " + + deniedReply); + } + + private void handle(RpcAcceptedReply acceptedReply, XDR xdr) { + AcceptState acceptState = acceptedReply.getAcceptState(); + assert (acceptState == AcceptState.SUCCESS); + boolean answer = xdr.readBoolean(); + if (answer != true) { + LOG.warn("Portmap mapping registration failed, accept state:" + + acceptState); + } + LOG.info("Portmap mapping registration succeeded"); + } + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java new file mode 100644 index 0000000000..9ac223de15 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor; + +/** + * Represents RPC message MSG_ACCEPTED reply body. See RFC 1831 for details. + * This response is sent to a request to indicate success of the request. + */ +public class RpcAcceptedReply extends RpcReply { + public enum AcceptState { + SUCCESS(0), /* RPC executed successfully */ + PROG_UNAVAIL(1), /* remote hasn't exported program */ + PROG_MISMATCH(2), /* remote can't support version # */ + PROC_UNAVAIL(3), /* program can't support procedure */ + GARBAGE_ARGS(4), /* procedure can't decode params */ + SYSTEM_ERR(5); /* e.g. memory allocation failure */ + + private final int value; + + AcceptState(int value) { + this.value = value; + } + + public static AcceptState fromValue(int value) { + return values()[value]; + } + + public int getValue() { + return value; + } + }; + + private final RpcAuthInfo verifier; + private final AcceptState acceptState; + + RpcAcceptedReply(int xid, int messageType, ReplyState state, + RpcAuthInfo verifier, AcceptState acceptState) { + super(xid, messageType, state); + this.verifier = verifier; + this.acceptState = acceptState; + } + + public static RpcAcceptedReply read(int xid, int messageType, + ReplyState replyState, XDR xdr) { + RpcAuthInfo verifier = RpcAuthInfo.read(xdr); + AcceptState acceptState = AcceptState.fromValue(xdr.readInt()); + return new RpcAcceptedReply(xid, messageType, replyState, verifier, + acceptState); + } + + public RpcAuthInfo getVerifier() { + return verifier; + } + + public AcceptState getAcceptState() { + return acceptState; + } + + public static XDR voidReply(XDR xdr, int xid) { + return voidReply(xdr, xid, AcceptState.SUCCESS); + } + + public static XDR voidReply(XDR xdr, int xid, AcceptState acceptState) { + xdr.writeInt(xid); + xdr.writeInt(RpcMessage.RPC_REPLY); + xdr.writeInt(ReplyState.MSG_ACCEPTED.getValue()); + xdr.writeInt(AuthFlavor.AUTH_NONE.getValue()); + xdr.writeVariableOpaque(new byte[0]); + xdr.writeInt(acceptState.getValue()); + return xdr; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java new file mode 100644 index 0000000000..a507d0d20d --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.util.Arrays; + +/** + * Authentication Info as defined in RFC 1831 + */ +public class RpcAuthInfo { + /** Different types of authentication as defined in RFC 1831 */ + public enum AuthFlavor { + AUTH_NONE(0), + AUTH_SYS(1), + AUTH_SHORT(2), + AUTH_DH(3), + RPCSEC_GSS(6); + + private int value; + + AuthFlavor(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + static AuthFlavor fromValue(int value) { + for (AuthFlavor v : values()) { + if (v.value == value) { + return v; + } + } + throw new IllegalArgumentException("Invalid AuthFlavor value " + value); + } + } + + private final AuthFlavor flavor; + private final byte[] body; + + protected RpcAuthInfo(AuthFlavor flavor, byte[] body) { + this.flavor = flavor; + this.body = body; + } + + public static RpcAuthInfo read(XDR xdr) { + int type = xdr.readInt(); + AuthFlavor flavor = AuthFlavor.fromValue(type); + byte[] body = xdr.readVariableOpaque(); + return new RpcAuthInfo(flavor, body); + } + + public AuthFlavor getFlavor() { + return flavor; + } + + public byte[] getBody() { + return Arrays.copyOf(body, body.length); + } + + @Override + public String toString() { + return "(AuthFlavor:" + flavor + ")"; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java new file mode 100644 index 0000000000..dbedb3649e --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +/** + * AUTH_SYS as defined in RFC 1831 + */ +public class RpcAuthSys { + private final int uid; + private final int gid; + + public RpcAuthSys(int uid, int gid) { + this.uid = uid; + this.gid = gid; + } + + public static RpcAuthSys from(byte[] credentials) { + XDR sys = new XDR(credentials); + sys.skip(4); // Stamp + sys.skipVariableOpaque(); // Machine name + return new RpcAuthSys(sys.readInt(), sys.readInt()); + } + + public int getUid() { + return uid; + } + + public int getGid() { + return gid; + } + + @Override + public String toString() { + return "(AuthSys: uid=" + uid + " gid=" + gid + ")"; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java new file mode 100644 index 0000000000..74e6373283 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Represents an RPC message of type RPC call as defined in RFC 1831 + */ +public class RpcCall extends RpcMessage { + public static final int RPC_VERSION = 2; + private static final Log LOG = LogFactory.getLog(RpcCall.class); + private final int rpcVersion; + private final int program; + private final int version; + private final int procedure; + private final RpcAuthInfo credential; + private final RpcAuthInfo verifier; + + protected RpcCall(int xid, int messageType, int rpcVersion, int program, + int version, int procedure, RpcAuthInfo credential, RpcAuthInfo verifier) { + super(xid, messageType); + this.rpcVersion = rpcVersion; + this.program = program; + this.version = version; + this.procedure = procedure; + this.credential = credential; + this.verifier = verifier; + if (LOG.isTraceEnabled()) { + LOG.trace(this); + } + validate(); + } + + private void validateRpcVersion() { + if (rpcVersion != RPC_VERSION) { + throw new IllegalArgumentException("RPC version is expected to be " + + RPC_VERSION + " but got " + rpcVersion); + } + } + + public void validate() { + validateMessageType(RPC_CALL); + validateRpcVersion(); + // Validate other members + // Throw exception if validation fails + } + + + public int getRpcVersion() { + return rpcVersion; + } + + public int getProgram() { + return program; + } + + public int getVersion() { + return version; + } + + public int getProcedure() { + return procedure; + } + + public RpcAuthInfo getCredential() { + return credential; + } + + public RpcAuthInfo getVerifier() { + return verifier; + } + + public static RpcCall read(XDR xdr) { + return new RpcCall(xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(), + xdr.readInt(), xdr.readInt(), RpcAuthInfo.read(xdr), + RpcAuthInfo.read(xdr)); + } + + public static void write(XDR out, int xid, int program, int progVersion, + int procedure) { + out.writeInt(xid); + out.writeInt(RpcMessage.RPC_CALL); + out.writeInt(2); + out.writeInt(program); + out.writeInt(progVersion); + out.writeInt(procedure); + } + + @Override + public String toString() { + return String.format("Xid:%d, messageType:%d, rpcVersion:%d, program:%d," + + " version:%d, procedure:%d, credential:%s, verifier:%s", xid, + messageType, rpcVersion, program, version, procedure, + credential.toString(), verifier.toString()); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java new file mode 100644 index 0000000000..0862d4fb4e --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.net.InetAddress; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class is used for handling the duplicate non-idempotenty Rpc + * calls. A non-idempotent request is processed as follows: + * + *
+ * A request is identified by the client ID (address of the client) and + * transaction ID (xid) from the Rpc call. + * + */ +public class RpcCallCache { + + public static class CacheEntry { + private XDR response; // null if no response has been sent + + public CacheEntry() { + response = null; + } + + public boolean isInProgress() { + return response == null; + } + + public boolean isCompleted() { + return response != null; + } + + public XDR getResponse() { + return response; + } + + public void setResponse(XDR response) { + this.response = response; + } + } + + /** + * Call that is used to track a client in the {@link RpcCallCache} + */ + public static class ClientRequest { + protected final InetAddress clientId; + protected final int xid; + + public InetAddress getClientId() { + return clientId; + } + + public ClientRequest(InetAddress clientId, int xid) { + this.clientId = clientId; + this.xid = xid; + } + + @Override + public int hashCode() { + return xid + clientId.hashCode() * 31; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || !(obj instanceof ClientRequest)) { + return false; + } + ClientRequest other = (ClientRequest) obj; + return clientId.equals(other.clientId) && (xid == other.xid); + } + } + + private final String program; + + private final Map map; + + public RpcCallCache(final String program, final int maxEntries) { + if (maxEntries <= 0) { + throw new IllegalArgumentException("Cache size is " + maxEntries + + ". Should be > 0"); + } + this.program = program; + map = new LinkedHashMap() { + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry( + java.util.Map.Entry eldest) { + return RpcCallCache.this.size() > maxEntries; + } + }; + } + + /** Return the program name */ + public String getProgram() { + return program; + } + + /** Mark a request as completed and add corresponding response to the cache */ + public void callCompleted(InetAddress clientId, int xid, XDR response) { + ClientRequest req = new ClientRequest(clientId, xid); + CacheEntry e; + synchronized(map) { + e = map.get(req); + } + e.setResponse(response); + } + + /** + * Check the cache for an entry. If it does not exist, add the request + * as in progress. + */ + public CacheEntry checkOrAddToCache(InetAddress clientId, int xid) { + ClientRequest req = new ClientRequest(clientId, xid); + CacheEntry e; + synchronized(map) { + e = map.get(req); + if (e == null) { + // Add an inprogress cache entry + map.put(req, new CacheEntry()); + } + } + return e; + } + + /** Return number of cached entries */ + public int size() { + return map.size(); + } + + /** + * Iterator to the cache entries + * @return iterator + */ + @VisibleForTesting + public Iterator> iterator() { + return map.entrySet().iterator(); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java new file mode 100644 index 0000000000..26abd69d25 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor; + +/** + * Represents RPC message MSG_DENIED reply body. See RFC 1831 for details. + * This response is sent to a request to indicate failure of the request. + */ +public class RpcDeniedReply extends RpcReply { + public enum RejectState { + RPC_MISMATCH(0), AUTH_ERROR(1); + + private final int value; + + RejectState(int value) { + this.value = value; + } + + int getValue() { + return value; + } + + static RejectState fromValue(int value) { + return values()[value]; + } + } + + private final RejectState rejectState; + + RpcDeniedReply(int xid, int messageType, ReplyState replyState, + RejectState rejectState) { + super(xid, messageType, replyState); + this.rejectState = rejectState; + } + + public static RpcDeniedReply read(int xid, int messageType, + ReplyState replyState, XDR xdr) { + RejectState rejectState = RejectState.fromValue(xdr.readInt()); + return new RpcDeniedReply(xid, messageType, replyState, rejectState); + } + + public RejectState getRejectState() { + return rejectState; + } + + @Override + public String toString() { + return new StringBuffer().append("xid:").append(xid) + .append(",messageType:").append(messageType).append("rejectState:") + .append(rejectState).toString(); + } + + public static XDR voidReply(XDR xdr, int xid, ReplyState msgAccepted, + RejectState rejectState) { + xdr.writeInt(xid); + xdr.writeInt(RpcMessage.RPC_REPLY); + xdr.writeInt(msgAccepted.getValue()); + xdr.writeInt(AuthFlavor.AUTH_NONE.getValue()); + xdr.writeVariableOpaque(new byte[0]); + xdr.writeInt(rejectState.getValue()); + return xdr; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java new file mode 100644 index 0000000000..45b6bb83f1 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + +/** + * {@link FrameDecoder} for RPC messages. + */ +public class RpcFrameDecoder extends FrameDecoder { + public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class); + private ChannelBuffer frame; + + /** + * Decode an RPC message received on the socket. + * @return mpnull if incomplete message is received. + */ + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, + ChannelBuffer buf) { + + // Make sure if the length field was received. + if (buf.readableBytes() < 4) { + if (LOG.isTraceEnabled()) { + LOG.trace("Length field is not received yet"); + } + return null; + } + + // Note the index and go back to it when an incomplete message is received + buf.markReaderIndex(); + + // Read the record marking. + ChannelBuffer fragmentHeader = buf.readBytes(4); + int length = XDR.fragmentSize(fragmentHeader.array()); + boolean isLast = XDR.isLastFragment(fragmentHeader.array()); + + // Make sure if there's enough bytes in the buffer. + if (buf.readableBytes() < length) { + + if (LOG.isTraceEnabled()) { + LOG.trace(length + " bytes are not received yet"); + } + buf.resetReaderIndex(); // Go back to the right reader index + return null; + } + + if (frame == null) { + frame = buf.readBytes(length); + } else { + ChannelBuffer tmp = ChannelBuffers.copiedBuffer(frame.array(), buf + .readBytes(length).array()); + frame = tmp; + } + + // Successfully decoded a frame. Return the decoded frame if the frame is + // the last one. Otherwise, wait for the next frame. + if (isLast) { + ChannelBuffer completeFrame = frame; + frame = null; + return completeFrame; + } else { + LOG.info("Wait for the next frame. This rarely happens."); + return null; + } + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java new file mode 100644 index 0000000000..3bd45c7159 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +/** + * Represent an RPC message as defined in RFC 1831. + */ +public abstract class RpcMessage { + public static final int RPC_CALL = 0; + public static final int RPC_REPLY = 1; + + protected final int xid; + protected final int messageType; + + RpcMessage(int xid, int messageType) { + if (messageType != RPC_CALL && messageType != RPC_REPLY) { + throw new IllegalArgumentException("Invalid message type " + messageType); + } + this.xid = xid; + this.messageType = messageType; + } + + public int getXid() { + return xid; + } + + public int getMessageType() { + return messageType; + } + + protected void validateMessageType(int expected) { + if (expected != messageType) { + throw new IllegalArgumentException("Message type is expected to be " + + expected + " but got " + messageType); + } + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java new file mode 100644 index 0000000000..d82e624265 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.io.IOException; +import java.net.InetAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; +import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry; +import org.apache.hadoop.portmap.PortmapMapping; +import org.apache.hadoop.portmap.PortmapRequest; +import org.jboss.netty.channel.Channel; + +/** + * Class for writing RPC server programs based on RFC 1050. Extend this class + * and implement {@link #handleInternal} to handle the requests received. + */ +public abstract class RpcProgram { + private static final Log LOG = LogFactory.getLog(RpcProgram.class); + public static final int RPCB_PORT = 111; + private final String program; + private final String host; + private final int port; + private final int progNumber; + private final int lowProgVersion; + private final int highProgVersion; + private final RpcCallCache rpcCallCache; + + /** + * Constructor + * + * @param program program name + * @param host host where the Rpc server program is started + * @param port port where the Rpc server program is listening to + * @param progNumber program number as defined in RFC 1050 + * @param lowProgVersion lowest version of the specification supported + * @param highProgVersion highest version of the specification supported + * @param cacheSize size of cache to handle duplciate requests. Size <= 0 + * indicates no cache. + */ + protected RpcProgram(String program, String host, int port, int progNumber, + int lowProgVersion, int highProgVersion, int cacheSize) { + this.program = program; + this.host = host; + this.port = port; + this.progNumber = progNumber; + this.lowProgVersion = lowProgVersion; + this.highProgVersion = highProgVersion; + this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize) + : null; + } + + /** + * Register this program with the local portmapper. + */ + public void register(int transport) { + // Register all the program versions with portmapper for a given transport + for (int vers = lowProgVersion; vers <= highProgVersion; vers++) { + register(vers, transport); + } + } + + /** + * Register this program with the local portmapper. + */ + private void register(int progVersion, int transport) { + PortmapMapping mapEntry = new PortmapMapping(progNumber, progVersion, + transport, port); + register(mapEntry); + } + + /** + * Register the program with Portmap or Rpcbind + */ + protected void register(PortmapMapping mapEntry) { + XDR mappingRequest = PortmapRequest.create(mapEntry); + SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT, + mappingRequest); + try { + registrationClient.run(); + } catch (IOException e) { + LOG.error("Registration failure with " + host + ":" + port + + ", portmap entry: " + mapEntry); + throw new RuntimeException("Registration failure"); + } + } + + /** + * Handle an RPC request. + * @param rpcCall RPC call that is received + * @param in xdr with cursor at reading the remaining bytes of a method call + * @param out xdr output corresponding to Rpc reply + * @param client making the Rpc request + * @param channel connection over which Rpc request is received + * @return response xdr response + */ + protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out, + InetAddress client, Channel channel); + + public XDR handle(XDR xdr, InetAddress client, Channel channel) { + XDR out = new XDR(); + RpcCall rpcCall = RpcCall.read(xdr); + if (LOG.isDebugEnabled()) { + LOG.debug(program + " procedure #" + rpcCall.getProcedure()); + } + + if (!checkProgram(rpcCall.getProgram())) { + return programMismatch(out, rpcCall); + } + + if (!checkProgramVersion(rpcCall.getVersion())) { + return programVersionMismatch(out, rpcCall); + } + + // Check for duplicate requests in the cache for non-idempotent requests + boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall); + if (idempotent) { + CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid()); + if (entry != null) { // in ache + if (entry.isCompleted()) { + LOG.info("Sending the cached reply to retransmitted request " + + rpcCall.getXid()); + return entry.getResponse(); + } else { // else request is in progress + LOG.info("Retransmitted request, transaction still in progress " + + rpcCall.getXid()); + // TODO: ignore the request? + } + } + } + + XDR response = handleInternal(rpcCall, xdr, out, client, channel); + if (response.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("No sync response, expect an async response for request XID=" + + rpcCall.getXid()); + } + } + + // Add the request to the cache + if (idempotent) { + rpcCallCache.callCompleted(client, rpcCall.getXid(), response); + } + return response; + } + + private XDR programMismatch(XDR out, RpcCall call) { + LOG.warn("Invalid RPC call program " + call.getProgram()); + RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_UNAVAIL); + return out; + } + + private XDR programVersionMismatch(XDR out, RpcCall call) { + LOG.warn("Invalid RPC call version " + call.getVersion()); + RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_MISMATCH); + out.writeInt(lowProgVersion); + out.writeInt(highProgVersion); + return out; + } + + private boolean checkProgram(int progNumber) { + return this.progNumber == progNumber; + } + + /** Return true if a the program version in rpcCall is supported */ + private boolean checkProgramVersion(int programVersion) { + return programVersion >= lowProgVersion + && programVersion <= highProgVersion; + } + + @Override + public String toString() { + return "Rpc program: " + program + " at " + host + ":" + port; + } + + protected abstract boolean isIdempotent(RpcCall call); + + public int getPort() { + return port; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java new file mode 100644 index 0000000000..4681d6d969 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +/** + * Represents an RPC message of type RPC reply as defined in RFC 1831 + */ +public abstract class RpcReply extends RpcMessage { + /** RPC reply_stat as defined in RFC 1831 */ + public enum ReplyState { + MSG_ACCEPTED(0), + MSG_DENIED(1); + + private final int value; + ReplyState(int value) { + this.value = value; + } + + int getValue() { + return value; + } + + public static ReplyState fromValue(int value) { + return values()[value]; + } + } + + private final ReplyState state; + + RpcReply(int xid, int messageType, ReplyState state) { + super(xid, messageType); + this.state = state; + validateMessageType(RPC_REPLY); + } + + public static RpcReply read(XDR xdr) { + int xid = xdr.readInt(); + int messageType = xdr.readInt(); + ReplyState stat = ReplyState.fromValue(xdr.readInt()); + switch (stat) { + case MSG_ACCEPTED: + return RpcAcceptedReply.read(xid, messageType, stat, xdr); + case MSG_DENIED: + return RpcDeniedReply.read(xid, messageType, stat, xdr); + } + return null; + } + + public ReplyState getState() { + return state; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java new file mode 100644 index 0000000000..7186dd1359 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +/** + * The XID in RPC call. It is used for starting with new seed after each reboot. + */ +public class RpcUtil { + private static int xid = (int) (System.currentTimeMillis() / 1000) << 12; + + public static int getNewXid(String caller) { + return xid = ++xid + caller.hashCode(); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java new file mode 100644 index 0000000000..287aa9fa8a --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +import org.apache.hadoop.oncrpc.RpcFrameDecoder; +import org.apache.hadoop.oncrpc.XDR; +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; + +/** + * A simple TCP based RPC client which just sends a request to a server. + */ +public class SimpleTcpClient { + protected final String host; + protected final int port; + protected final XDR request; + protected ChannelPipelineFactory pipelineFactory; + protected final boolean oneShot; + + public SimpleTcpClient(String host, int port, XDR request) { + this(host,port, request, true); + } + + public SimpleTcpClient(String host, int port, XDR request, Boolean oneShot) { + this.host = host; + this.port = port; + this.request = request; + this.oneShot = oneShot; + } + + protected ChannelPipelineFactory setPipelineFactory() { + this.pipelineFactory = new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() { + return Channels.pipeline(new RpcFrameDecoder(), + new SimpleTcpClientHandler(request)); + } + }; + return this.pipelineFactory; + } + + public void run() { + // Configure the client. + ChannelFactory factory = new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1); + ClientBootstrap bootstrap = new ClientBootstrap(factory); + + // Set up the pipeline factory. + bootstrap.setPipelineFactory(setPipelineFactory()); + + bootstrap.setOption("tcpNoDelay", true); + bootstrap.setOption("keepAlive", true); + + // Start the connection attempt. + ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); + + if (oneShot) { + // Wait until the connection is closed or the connection attempt fails. + future.getChannel().getCloseFuture().awaitUninterruptibly(); + + // Shut down thread pools to exit. + bootstrap.releaseExternalResources(); + } + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java new file mode 100644 index 0000000000..b72153a312 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; + +/** + * A simple TCP based RPC client handler used by {@link SimpleTcpServer}. + */ +public class SimpleTcpClientHandler extends SimpleChannelHandler { + public static final Log LOG = LogFactory.getLog(SimpleTcpClient.class); + protected final XDR request; + + public SimpleTcpClientHandler(XDR request) { + this.request = request; + } + + @Override + public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { + // Send the request + if (LOG.isDebugEnabled()) { + LOG.debug("sending PRC request"); + } + ChannelBuffer outBuf = XDR.writeMessageTcp(request, true); + e.getChannel().write(outBuf); + } + + /** + * Shutdown connection by default. Subclass can override this method to do + * more interaction with the server. + */ + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + e.getChannel().close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + LOG.warn("Unexpected exception from downstream: ", e.getCause()); + e.getChannel().close(); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java new file mode 100644 index 0000000000..e168ef406b --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; + +/** + * Simple UDP server implemented using netty. + */ +public class SimpleTcpServer { + public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class); + protected final int port; + protected final ChannelPipelineFactory pipelineFactory; + protected final RpcProgram rpcProgram; + + /** The maximum number of I/O worker threads */ + protected final int workerCount; + + /** + * @param port TCP port where to start the server at + * @param program RPC program corresponding to the server + * @param workercount Number of worker threads + */ + public SimpleTcpServer(int port, RpcProgram program, int workercount) { + this.port = port; + this.rpcProgram = program; + this.workerCount = workercount; + this.pipelineFactory = getPipelineFactory(); + } + + public ChannelPipelineFactory getPipelineFactory() { + return new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() { + return Channels.pipeline(new RpcFrameDecoder(), + new SimpleTcpServerHandler(rpcProgram)); + } + }; + } + + public void run() { + // Configure the Server. + ChannelFactory factory; + if (workerCount == 0) { + // Use default workers: 2 * the number of available processors + factory = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + } else { + factory = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), + workerCount); + } + + ServerBootstrap bootstrap = new ServerBootstrap(factory); + bootstrap.setPipelineFactory(pipelineFactory); + bootstrap.setOption("child.tcpNoDelay", true); + bootstrap.setOption("child.keepAlive", true); + + // Listen to TCP port + bootstrap.bind(new InetSocketAddress(port)); + + LOG.info("Started listening to TCP requests at port " + port + " for " + + rpcProgram + " with workerCount " + workerCount); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java new file mode 100644 index 0000000000..71cce18f58 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; + +/** + * Handler used by {@link SimpleTcpServer}. + */ +public class SimpleTcpServerHandler extends SimpleChannelHandler { + public static final Log LOG = LogFactory.getLog(SimpleTcpServerHandler.class); + + protected final RpcProgram rpcProgram; + + public SimpleTcpServerHandler(RpcProgram rpcProgram) { + this.rpcProgram = rpcProgram; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + ChannelBuffer buf = (ChannelBuffer) e.getMessage(); + XDR request = new XDR(buf.array()); + + InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel() + .getRemoteAddress()).getAddress(); + Channel outChannel = e.getChannel(); + XDR response = rpcProgram.handle(request, remoteInetAddr, outChannel); + if (response.size() > 0) { + outChannel.write(XDR.writeMessageTcp(response, true)); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + LOG.warn("Encountered ", e.getCause()); + e.getChannel().close(); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java new file mode 100644 index 0000000000..344dcd7801 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.util.Arrays; + +/** + * A simple UDP based RPC client which just sends one request to a server. + */ +public class SimpleUdpClient { + protected final String host; + protected final int port; + protected final XDR request; + protected final boolean oneShot; + + public SimpleUdpClient(String host, int port, XDR request) { + this(host, port, request, true); + } + + public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot) { + this.host = host; + this.port = port; + this.request = request; + this.oneShot = oneShot; + } + + public void run() throws IOException { + DatagramSocket clientSocket = new DatagramSocket(); + InetAddress IPAddress = InetAddress.getByName(host); + byte[] sendData = request.getBytes(); + byte[] receiveData = new byte[65535]; + + DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, + IPAddress, port); + clientSocket.send(sendPacket); + DatagramPacket receivePacket = new DatagramPacket(receiveData, + receiveData.length); + clientSocket.receive(receivePacket); + + // Check reply status + XDR xdr = new XDR(); + xdr.writeFixedOpaque(Arrays.copyOfRange(receiveData, 0, + receivePacket.getLength())); + RpcReply reply = RpcReply.read(xdr); + if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) { + throw new IOException("Request failed: " + reply.getState()); + } + + clientSocket.close(); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java new file mode 100644 index 0000000000..70bffba66d --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.DatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; + +/** + * Simple UDP server implemented based on netty. + */ +public class SimpleUdpServer { + public static final Log LOG = LogFactory.getLog(SimpleUdpServer.class); + private final int SEND_BUFFER_SIZE = 65536; + private final int RECEIVE_BUFFER_SIZE = 65536; + + protected final int port; + protected final ChannelPipelineFactory pipelineFactory; + protected final RpcProgram rpcProgram; + protected final int workerCount; + + public SimpleUdpServer(int port, RpcProgram program, int workerCount) { + this.port = port; + this.rpcProgram = program; + this.workerCount = workerCount; + this.pipelineFactory = new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() { + return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram)); + } + }; + } + + public void run() { + // Configure the client. + DatagramChannelFactory f = new NioDatagramChannelFactory( + Executors.newCachedThreadPool(), workerCount); + + ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); + ChannelPipeline p = b.getPipeline(); + p.addLast("handler", new SimpleUdpServerHandler(rpcProgram)); + + b.setOption("broadcast", "false"); + b.setOption("sendBufferSize", SEND_BUFFER_SIZE); + b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE); + + // Listen to the UDP port + b.bind(new InetSocketAddress(port)); + + LOG.info("Started listening to UDP requests at port " + port + " for " + + rpcProgram + " with workerCount " + workerCount); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java new file mode 100644 index 0000000000..223b7794a2 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; + +/** + * Handler used by {@link SimpleUdpServer}. + */ +public class SimpleUdpServerHandler extends SimpleChannelHandler { + public static final Log LOG = LogFactory.getLog(SimpleUdpServerHandler.class); + private final RpcProgram rpcProgram; + + public SimpleUdpServerHandler(RpcProgram rpcProgram) { + this.rpcProgram = rpcProgram; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + ChannelBuffer buf = (ChannelBuffer) e.getMessage(); + + XDR request = new XDR(); + + request.writeFixedOpaque(buf.array()); + InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress()) + .getAddress(); + XDR response = rpcProgram.handle(request, remoteInetAddr, null); + e.getChannel().write(XDR.writeMessageUdp(response), e.getRemoteAddress()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + LOG.warn("Encountered ", e.getCause()); + e.getChannel().close(); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java new file mode 100644 index 0000000000..40633e286d --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java @@ -0,0 +1,418 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.io.PrintStream; +import java.util.Arrays; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Utility class for building XDR messages based on RFC 4506. + *

+ * This class maintains a buffer into which java types are written as + * XDR types for building XDR messages. Similarly this class can + * be used to get java types from an XDR request or response. + *

+ * Currently only a subset of XDR types defined in RFC 4506 are supported. + */ +public class XDR { + private final static String HEXES = "0123456789abcdef"; + + /** Internal buffer for reading or writing to */ + private byte[] bytearr; + + /** Place to read from or write to */ + private int cursor; + + public XDR() { + this(new byte[0]); + } + + public XDR(byte[] data) { + bytearr = Arrays.copyOf(data, data.length); + cursor = 0; + } + + /** + * @param bytes bytes to be appended to internal buffer + */ + private void append(byte[] bytesToAdd) { + bytearr = append(bytearr, bytesToAdd); + } + + public int size() { + return bytearr.length; + } + + /** Skip some bytes by moving the cursor */ + public void skip(int size) { + cursor += size; + } + + /** + * Write Java primitive integer as XDR signed integer. + * + * Definition of XDR signed integer from RFC 4506: + *

+   * An XDR signed integer is a 32-bit datum that encodes an integer in
+   * the range [-2147483648,2147483647].  The integer is represented in
+   * two's complement notation.  The most and least significant bytes are
+   * 0 and 3, respectively.  Integers are declared as follows:
+   * 
+   *       int identifier;
+   * 
+   *            (MSB)                   (LSB)
+   *          +-------+-------+-------+-------+
+   *          |byte 0 |byte 1 |byte 2 |byte 3 |                      INTEGER
+   *          +-------+-------+-------+-------+
+   *          <------------32 bits------------>
+   * 
+ */ + public void writeInt(int data) { + append(toBytes(data)); + } + + /** + * Read an XDR signed integer and return as Java primitive integer. + */ + public int readInt() { + byte byte0 = bytearr[cursor++]; + byte byte1 = bytearr[cursor++]; + byte byte2 = bytearr[cursor++]; + byte byte3 = bytearr[cursor++]; + return (XDR.toShort(byte0) << 24) + (XDR.toShort(byte1) << 16) + + (XDR.toShort(byte2) << 8) + XDR.toShort(byte3); + } + + /** + * Write Java primitive boolean as an XDR boolean. + * + * Definition of XDR boolean from RFC 4506: + *
+   *    Booleans are important enough and occur frequently enough to warrant
+   *    their own explicit type in the standard.  Booleans are declared as
+   *    follows:
+   * 
+   *          bool identifier;
+   * 
+   *    This is equivalent to:
+   * 
+   *          enum { FALSE = 0, TRUE = 1 } identifier;
+   * 
+ */ + public void writeBoolean(boolean data) { + this.writeInt(data ? 1 : 0); + } + + /** + * Read an XDR boolean and return as Java primitive boolean. + */ + public boolean readBoolean() { + return readInt() == 0 ? false : true; + } + + /** + * Write Java primitive long to an XDR signed long. + * + * Definition of XDR signed long from RFC 4506: + *
+   *    The standard also defines 64-bit (8-byte) numbers called hyper
+   *    integers and unsigned hyper integers.  Their representations are the
+   *    obvious extensions of integer and unsigned integer defined above.
+   *    They are represented in two's complement notation.The most and
+   *    least significant bytes are 0 and 7, respectively. Their
+   *    declarations:
+   * 
+   *    hyper identifier; unsigned hyper identifier;
+   * 
+   *         (MSB)                                                   (LSB)
+   *       +-------+-------+-------+-------+-------+-------+-------+-------+
+   *       |byte 0 |byte 1 |byte 2 |byte 3 |byte 4 |byte 5 |byte 6 |byte 7 |
+   *       +-------+-------+-------+-------+-------+-------+-------+-------+
+   *       <----------------------------64 bits---------------------------->
+   *                                                  HYPER INTEGER
+   *                                                  UNSIGNED HYPER INTEGER
+   * 
+ */ + public void writeLongAsHyper(long data) { + byte byte0 = (byte) ((data & 0xff00000000000000l) >> 56); + byte byte1 = (byte) ((data & 0x00ff000000000000l) >> 48); + byte byte2 = (byte) ((data & 0x0000ff0000000000l) >> 40); + byte byte3 = (byte) ((data & 0x000000ff00000000l) >> 32); + byte byte4 = (byte) ((data & 0x00000000ff000000l) >> 24); + byte byte5 = (byte) ((data & 0x0000000000ff0000l) >> 16); + byte byte6 = (byte) ((data & 0x000000000000ff00l) >> 8); + byte byte7 = (byte) ((data & 0x00000000000000ffl)); + this.append(new byte[] { byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7 }); + } + + /** + * Read XDR signed hyper and return as java primitive long. + */ + public long readHyper() { + byte byte0 = bytearr[cursor++]; + byte byte1 = bytearr[cursor++]; + byte byte2 = bytearr[cursor++]; + byte byte3 = bytearr[cursor++]; + byte byte4 = bytearr[cursor++]; + byte byte5 = bytearr[cursor++]; + byte byte6 = bytearr[cursor++]; + byte byte7 = bytearr[cursor++]; + return ((long) XDR.toShort(byte0) << 56) + + ((long) XDR.toShort(byte1) << 48) + ((long) XDR.toShort(byte2) << 40) + + ((long) XDR.toShort(byte3) << 32) + ((long) XDR.toShort(byte4) << 24) + + ((long) XDR.toShort(byte5) << 16) + ((long) XDR.toShort(byte6) << 8) + + XDR.toShort(byte7); + } + + /** + * Write a Java primitive byte array to XDR fixed-length opaque data. + * + * Defintion of fixed-length opaque data from RFC 4506: + *
+   *    At times, fixed-length uninterpreted data needs to be passed among
+   *    machines.  This data is called "opaque" and is declared as follows:
+   * 
+   *          opaque identifier[n];
+   * 
+   *    where the constant n is the (static) number of bytes necessary to
+   *    contain the opaque data.  If n is not a multiple of four, then the n
+   *    bytes are followed by enough (0 to 3) residual zero bytes, r, to make
+   *    the total byte count of the opaque object a multiple of four.
+   * 
+   *           0        1     ...
+   *       +--------+--------+...+--------+--------+...+--------+
+   *       | byte 0 | byte 1 |...|byte n-1|    0   |...|    0   |
+   *       +--------+--------+...+--------+--------+...+--------+
+   *       |<-----------n bytes---------->|<------r bytes------>|
+   *       |<-----------n+r (where (n+r) mod 4 = 0)------------>|
+   *                                                    FIXED-LENGTH OPAQUE
+   * 
+ */ + public void writeFixedOpaque(byte[] data) { + writeFixedOpaque(data, data.length); + } + + public void writeFixedOpaque(byte[] data, int length) { + append(Arrays.copyOf(data, length + XDR.pad(length, 4))); + } + + public byte[] readFixedOpaque(int size) { + byte[] ret = new byte[size]; + for(int i = 0; i < size; i++) { + ret[i] = bytearr[cursor]; + cursor++; + } + + for(int i = 0; i < XDR.pad(size, 4); i++) { + cursor++; + } + return ret; + } + + /** + * Write a Java primitive byte array as XDR variable-length opque data. + * + * Definition of XDR variable-length opaque data RFC 4506: + * + *
+   *    The standard also provides for variable-length (counted) opaque data,
+   *    defined as a sequence of n (numbered 0 through n-1) arbitrary bytes
+   *    to be the number n encoded as an unsigned integer (as described
+   *    below), and followed by the n bytes of the sequence.
+   * 
+   *    Byte m of the sequence always precedes byte m+1 of the sequence, and
+   *    byte 0 of the sequence always follows the sequence's length (count).
+   *    If n is not a multiple of four, then the n bytes are followed by
+   *    enough (0 to 3) residual zero bytes, r, to make the total byte count
+   *    a multiple of four.  Variable-length opaque data is declared in the
+   *    following way:
+   * 
+   *          opaque identifier;
+   *       or
+   *          opaque identifier<>;
+   * 
+   *    The constant m denotes an upper bound of the number of bytes that the
+   *    sequence may contain.  If m is not specified, as in the second
+   *    declaration, it is assumed to be (2**32) - 1, the maximum length.
+   * 
+   *    The constant m would normally be found in a protocol specification.
+   *    For example, a filing protocol may state that the maximum data
+   *    transfer size is 8192 bytes, as follows:
+   * 
+   *          opaque filedata<8192>;
+   * 
+   *             0     1     2     3     4     5   ...
+   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+   *          |        length n       |byte0|byte1|...| n-1 |  0  |...|  0  |
+   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+   *          |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
+   *                                  |<----n+r (where (n+r) mod 4 = 0)---->|
+   *                                                   VARIABLE-LENGTH OPAQUE
+   * 
+   *    It is an error to encode a length greater than the maximum described
+   *    in the specification.
+   * 
+ */ + public void writeVariableOpaque(byte[] data) { + this.writeInt(data.length); + this.writeFixedOpaque(data); + } + + public byte[] readVariableOpaque() { + int size = this.readInt(); + return size != 0 ? this.readFixedOpaque(size) : null; + } + + public void skipVariableOpaque() { + int length= this.readInt(); + this.skip(length+XDR.pad(length, 4)); + } + + /** + * Write Java String as XDR string. + * + * Definition of XDR string from RFC 4506: + * + *
+   *    The standard defines a string of n (numbered 0 through n-1) ASCII
+   *    bytes to be the number n encoded as an unsigned integer (as described
+   *    above), and followed by the n bytes of the string.  Byte m of the
+   *    string always precedes byte m+1 of the string, and byte 0 of the
+   *    string always follows the string's length.  If n is not a multiple of
+   *    four, then the n bytes are followed by enough (0 to 3) residual zero
+   *    bytes, r, to make the total byte count a multiple of four.  Counted
+   *    byte strings are declared as follows:
+   * 
+   *          string object;
+   *       or
+   *          string object<>;
+   * 
+   *    The constant m denotes an upper bound of the number of bytes that a
+   *    string may contain.  If m is not specified, as in the second
+   *    declaration, it is assumed to be (2**32) - 1, the maximum length.
+   *    The constant m would normally be found in a protocol specification.
+   *    For example, a filing protocol may state that a file name can be no
+   *    longer than 255 bytes, as follows:
+   * 
+   *          string filename<255>;
+   * 
+   *             0     1     2     3     4     5   ...
+   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+   *          |        length n       |byte0|byte1|...| n-1 |  0  |...|  0  |
+   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
+   *          |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
+   *                                  |<----n+r (where (n+r) mod 4 = 0)---->|
+   *                                                                   STRING
+   *    It is an error to encode a length greater than the maximum described
+   *    in the specification.
+   * 
+ */ + public void writeString(String data) { + this.writeVariableOpaque(data.getBytes()); + } + + public String readString() { + return new String(this.readVariableOpaque()); + } + + public void dump(PrintStream out) { + for(int i = 0; i < bytearr.length; i += 4) { + out.println(hex(bytearr[i]) + " " + hex(bytearr[i + 1]) + " " + + hex(bytearr[i + 2]) + " " + hex(bytearr[i + 3])); + } + } + + @VisibleForTesting + public byte[] getBytes() { + return Arrays.copyOf(bytearr, bytearr.length); + } + + public static byte[] append(byte[] bytes, byte[] bytesToAdd) { + byte[] newByteArray = new byte[bytes.length + bytesToAdd.length]; + System.arraycopy(bytes, 0, newByteArray, 0, bytes.length); + System.arraycopy(bytesToAdd, 0, newByteArray, bytes.length, bytesToAdd.length); + return newByteArray; + } + + private static int pad(int x, int y) { + return x % y == 0 ? 0 : y - (x % y); + } + + static byte[] toBytes(int n) { + byte[] ret = { (byte) ((n & 0xff000000) >> 24), + (byte) ((n & 0x00ff0000) >> 16), (byte) ((n & 0x0000ff00) >> 8), + (byte) (n & 0x000000ff) }; + return ret; + } + + private static short toShort(byte b) { + return b < 0 ? (short) (b + 256): (short) b; + } + + private static String hex(byte b) { + return "" + HEXES.charAt((b & 0xF0) >> 4) + HEXES.charAt((b & 0x0F)); + } + + private static byte[] recordMark(int size, boolean last) { + return toBytes(!last ? size : size | 0x80000000); + } + + public static byte[] getVariableOpque(byte[] data) { + byte[] bytes = toBytes(data.length); + return append(bytes, Arrays.copyOf(data, data.length + XDR.pad(data.length, 4))); + } + + public static int fragmentSize(byte[] mark) { + int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16) + + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]); + return n & 0x7fffffff; + } + + public static boolean isLastFragment(byte[] mark) { + int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16) + + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]); + return (n & 0x80000000) != 0; + } + + /** check if the rest of data has more than bytes */ + public static boolean verifyLength(XDR xdr, int len) { + return (xdr.bytearr.length - xdr.cursor) >= len; + } + + /** Write an XDR message to a TCP ChannelBuffer */ + public static ChannelBuffer writeMessageTcp(XDR request, boolean last) { + byte[] fragmentHeader = XDR.recordMark(request.bytearr.length, last); + ChannelBuffer outBuf = ChannelBuffers.buffer(fragmentHeader.length + + request.bytearr.length); + outBuf.writeBytes(fragmentHeader); + outBuf.writeBytes(request.bytearr); + return outBuf; + } + + /** Write an XDR message to a UDP ChannelBuffer */ + public static ChannelBuffer writeMessageUdp(XDR response) { + ChannelBuffer outBuf = ChannelBuffers.buffer(response.bytearr.length); + outBuf.writeBytes(response.bytearr); + return outBuf; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java new file mode 100644 index 0000000000..6a3e86c13d --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.portmap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.oncrpc.RpcProgram; +import org.apache.hadoop.oncrpc.SimpleTcpServer; +import org.apache.hadoop.oncrpc.SimpleUdpServer; +import org.apache.hadoop.util.StringUtils; + +/** + * Portmap service for binding RPC protocols. See RFC 1833 for details. + */ +public class Portmap { + public static final Log LOG = LogFactory.getLog(Portmap.class); + + private static void startUDPServer(RpcProgramPortmap rpcProgram) { + rpcProgram.register(PortmapMapping.TRANSPORT_UDP); + SimpleUdpServer udpServer = new SimpleUdpServer(RpcProgram.RPCB_PORT, + rpcProgram, 1); + udpServer.run(); + } + + private static void startTCPServer(final RpcProgramPortmap rpcProgram) { + rpcProgram.register(PortmapMapping.TRANSPORT_TCP); + SimpleTcpServer tcpServer = new SimpleTcpServer(RpcProgram.RPCB_PORT, + rpcProgram, 1); + tcpServer.run(); + } + + public static void main(String[] args) { + StringUtils.startupShutdownMessage(Portmap.class, args, LOG); + RpcProgramPortmap program = new RpcProgramPortmap(); + try { + startUDPServer(program); + startTCPServer(program); + } catch (Throwable e) { + LOG.fatal("Start server failure"); + System.exit(-1); + } + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java new file mode 100644 index 0000000000..d0f9af11e0 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.portmap; + +import org.apache.hadoop.oncrpc.XDR; + +/** + * Methods that need to be implemented to provide Portmap RPC program. + * See RFC 1833 for details. + */ +public interface PortmapInterface { + public enum Procedure { + PMAPPROC_NULL(0), + PMAPPROC_SET(1), + PMAPPROC_UNSET(2), + PMAPPROC_GETPORT(3), + PMAPPROC_DUMP(4), + PMAPPROC_CALLIT(5), + PMAPPROC_GETTIME(6), + PMAPPROC_UADDR2TADDR(7), + PMAPPROC_TADDR2UADDR(8), + PMAPPROC_GETVERSADDR(9), + PMAPPROC_INDIRECT(10), + PMAPPROC_GETADDRLIST(11), + PMAPPROC_GETSTAT(12); + + private final int value; + + Procedure(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static Procedure fromValue(int value) { + return values()[value]; + } + } + + /** + * This procedure does no work. By convention, procedure zero of any protocol + * takes no parameters and returns no results. + */ + public XDR nullOp(int xidd, XDR in, XDR out); + + /** + * When a program first becomes available on a machine, it registers itself + * with the port mapper program on the same machine. The program passes its + * program number "prog", version number "vers", transport protocol number + * "prot", and the port "port" on which it awaits service request. The + * procedure returns a boolean reply whose value is "TRUE" if the procedure + * successfully established the mapping and "FALSE" otherwise. The procedure + * refuses to establish a mapping if one already exists for the tuple + * "(prog, vers, prot)". + */ + public XDR set(int xid, XDR in, XDR out); + + /** + * When a program becomes unavailable, it should unregister itself with the + * port mapper program on the same machine. The parameters and results have + * meanings identical to those of "PMAPPROC_SET". The protocol and port number + * fields of the argument are ignored. + */ + public XDR unset(int xid, XDR in, XDR out); + + /** + * Given a program number "prog", version number "vers", and transport + * protocol number "prot", this procedure returns the port number on which the + * program is awaiting call requests. A port value of zeros means the program + * has not been registered. The "port" field of the argument is ignored. + */ + public XDR getport(int xid, XDR in, XDR out); + + /** + * This procedure enumerates all entries in the port mapper's database. The + * procedure takes no parameters and returns a list of program, version, + * protocol, and port values. + */ + public XDR dump(int xid, XDR in, XDR out); +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java new file mode 100644 index 0000000000..f73ab02c60 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.portmap; + +import org.apache.hadoop.oncrpc.XDR; + +/** + * Represents a mapping entry for in the Portmap service for binding RPC + * protocols. See RFC 1833 for details. + * + * This maps a program to a port number. + */ +public class PortmapMapping { + public static final int TRANSPORT_TCP = 6; + public static final int TRANSPORT_UDP = 17; + + private final int program; + private final int version; + private final int transport; + private final int port; + + public PortmapMapping(int program, int version, int transport, int port) { + this.program = program; + this.version = version; + this.transport = transport; + this.port = port; + } + + public XDR serialize(XDR xdr) { + xdr.writeInt(program); + xdr.writeInt(version); + xdr.writeInt(transport); + xdr.writeInt(port); + return xdr; + } + + public static PortmapMapping deserialize(XDR xdr) { + return new PortmapMapping(xdr.readInt(), xdr.readInt(), xdr.readInt(), + xdr.readInt()); + } + + public int getPort() { + return port; + } + + public static String key(PortmapMapping mapping) { + return mapping.program + " " + mapping.version + " " + mapping.transport; + } + + @Override + public String toString() { + return String.format("(PortmapMapping-%d:%d:%d:%d)", program, version, + transport, port); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java new file mode 100644 index 0000000000..11da7d44dc --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.portmap; + +import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor; +import org.apache.hadoop.oncrpc.RpcCall; +import org.apache.hadoop.oncrpc.RpcUtil; +import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.portmap.PortmapInterface.Procedure; + +/** + * Helper utility for building portmap request + */ +public class PortmapRequest { + public static PortmapMapping mapping(XDR xdr) { + return PortmapMapping.deserialize(xdr); + } + + public static XDR create(PortmapMapping mapping) { + XDR request = new XDR(); + RpcCall.write(request, + RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)), + RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION, + Procedure.PMAPPROC_SET.getValue()); + request.writeInt(AuthFlavor.AUTH_NONE.getValue()); + request.writeInt(0); + request.writeInt(0); + request.writeInt(0); + return mapping.serialize(request); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java new file mode 100644 index 0000000000..f650a74994 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.portmap; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hadoop.oncrpc.RpcAcceptedReply; +import org.apache.hadoop.oncrpc.XDR; + +/** + * Helper utility for sending portmap response. + */ +public class PortmapResponse { + public static XDR voidReply(XDR xdr, int xid) { + RpcAcceptedReply.voidReply(xdr, xid); + return xdr; + } + + public static XDR intReply(XDR xdr, int xid, int value) { + RpcAcceptedReply.voidReply(xdr, xid); + xdr.writeInt(value); + return xdr; + } + + public static XDR booleanReply(XDR xdr, int xid, boolean value) { + RpcAcceptedReply.voidReply(xdr, xid); + xdr.writeBoolean(value); + return xdr; + } + + public static XDR pmapList(XDR xdr, int xid, Collection list) { + RpcAcceptedReply.voidReply(xdr, xid); + for (PortmapMapping mapping : list) { + System.out.println(mapping); + xdr.writeBoolean(true); // Value follows + mapping.serialize(xdr); + } + xdr.writeBoolean(false); // No value follows + return xdr; + } + + public static XDR pmapList(XDR xdr, int xid, PortmapMapping[] list) { + return pmapList(xdr, xid, Arrays.asList(list)); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java new file mode 100644 index 0000000000..cbf381296a --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.portmap; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.oncrpc.RpcAcceptedReply; +import org.apache.hadoop.oncrpc.RpcCall; +import org.apache.hadoop.oncrpc.RpcProgram; +import org.apache.hadoop.oncrpc.XDR; +import org.jboss.netty.channel.Channel; + +/** + * An rpcbind request handler. + */ +public class RpcProgramPortmap extends RpcProgram implements PortmapInterface { + public static final int PROGRAM = 100000; + public static final int VERSION = 2; + + private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class); + + /** Map synchronized usis monitor lock of this instance */ + private final HashMap map; + + public RpcProgramPortmap() { + super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0); + map = new HashMap(256); + } + + /** Dump all the register RPC services */ + private synchronized void dumpRpcServices() { + Set> entrySet = map.entrySet(); + for (Entry entry : entrySet) { + LOG.info("Service: " + entry.getKey() + " portmapping: " + + entry.getValue()); + } + } + + @Override + public XDR nullOp(int xid, XDR in, XDR out) { + return PortmapResponse.voidReply(out, xid); + } + + @Override + public XDR set(int xid, XDR in, XDR out) { + PortmapMapping mapping = PortmapRequest.mapping(in); + String key = PortmapMapping.key(mapping); + if (LOG.isDebugEnabled()) { + LOG.debug("Portmap set key=" + key); + } + + PortmapMapping value = null; + synchronized(this) { + map.put(key, mapping); + dumpRpcServices(); + value = map.get(key); + } + return PortmapResponse.intReply(out, xid, value.getPort()); + } + + @Override + public synchronized XDR unset(int xid, XDR in, XDR out) { + PortmapMapping mapping = PortmapRequest.mapping(in); + synchronized(this) { + map.remove(PortmapMapping.key(mapping)); + } + return PortmapResponse.booleanReply(out, xid, true); + } + + @Override + public synchronized XDR getport(int xid, XDR in, XDR out) { + PortmapMapping mapping = PortmapRequest.mapping(in); + String key = PortmapMapping.key(mapping); + if (LOG.isDebugEnabled()) { + LOG.debug("Portmap GETPORT key=" + key + " " + mapping); + } + PortmapMapping value = null; + synchronized(this) { + value = map.get(key); + } + int res = 0; + if (value != null) { + res = value.getPort(); + if (LOG.isDebugEnabled()) { + LOG.debug("Found mapping for key: " + key + " port:" + res); + } + } else { + LOG.warn("Warning, no mapping for key: " + key); + } + return PortmapResponse.intReply(out, xid, res); + } + + @Override + public synchronized XDR dump(int xid, XDR in, XDR out) { + PortmapMapping[] pmapList = null; + synchronized(this) { + pmapList = new PortmapMapping[map.values().size()]; + map.values().toArray(pmapList); + } + return PortmapResponse.pmapList(out, xid, pmapList); + } + + @Override + public void register(PortmapMapping mapping) { + String key = PortmapMapping.key(mapping); + synchronized(this) { + map.put(key, mapping); + } + } + + @Override + public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out, + InetAddress client, Channel channel) { + Procedure procedure = Procedure.fromValue(rpcCall.getProcedure()); + int xid = rpcCall.getXid(); + switch (procedure) { + case PMAPPROC_NULL: + out = nullOp(xid, in, out); + break; + case PMAPPROC_SET: + out = set(xid, in, out); + break; + case PMAPPROC_UNSET: + out = unset(xid, in, out); + break; + case PMAPPROC_DUMP: + out = dump(xid, in, out); + break; + case PMAPPROC_GETPORT: + out = getport(xid, in, out); + break; + case PMAPPROC_GETVERSADDR: + out = getport(xid, in, out); + break; + default: + LOG.info("PortmapHandler unknown rpc procedure=" + procedure); + RpcAcceptedReply.voidReply(out, xid, + RpcAcceptedReply.AcceptState.PROC_UNAVAIL); + } + return out; + } + + @Override + protected boolean isIdempotent(RpcCall call) { + return false; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java new file mode 100644 index 0000000000..fab80ddcb6 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.oncrpc; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.net.InetAddress; +import java.nio.ByteBuffer; + +import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestFrameDecoder { + + private static int port = 12345; // some random server port + private static XDR result = null; + + static void testRequest(XDR request) { + SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request, + true); + tcpClient.run(); + } + + static class TestRpcProgram extends RpcProgram { + + protected TestRpcProgram(String program, String host, int port, + int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) { + super(program, host, port, progNumber, lowProgVersion, highProgVersion, + cacheSize); + } + + @Override + public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out, + InetAddress client, Channel channel) { + // Get the final complete request and return a void response. + result = in; + return RpcAcceptedReply.voidReply(out, 1234); + } + + @Override + protected boolean isIdempotent(RpcCall call) { + return false; + } + } + + @Test + public void testSingleFrame() { + RpcFrameDecoder decoder = new RpcFrameDecoder(); + + // Test "Length field is not received yet" + ByteBuffer buffer = ByteBuffer.allocate(1); + ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer); + ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode( + Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class), + buf); + assertTrue(channelBuffer == null); + + // Test all bytes are not received yet + byte[] fragment = new byte[4 + 9]; + fragment[0] = (byte) (1 << 7); // final fragment + fragment[1] = 0; + fragment[2] = 0; + fragment[3] = (byte) 10; // fragment size = 10 bytes + assertTrue(XDR.isLastFragment(fragment)); + assertTrue(XDR.fragmentSize(fragment)==10); + + buffer = ByteBuffer.allocate(4 + 9); + buffer.put(fragment); + buffer.flip(); + buf = new ByteBufferBackedChannelBuffer(buffer); + channelBuffer = (ChannelBuffer) decoder.decode( + Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class), + buf); + assertTrue(channelBuffer == null); + } + + @Test + public void testMultipleFrames() { + RpcFrameDecoder decoder = new RpcFrameDecoder(); + + // Test multiple frames + byte[] fragment1 = new byte[4 + 10]; + fragment1[0] = 0; // not final fragment + fragment1[1] = 0; + fragment1[2] = 0; + fragment1[3] = (byte) 10; // fragment size = 10 bytes + assertFalse(XDR.isLastFragment(fragment1)); + assertTrue(XDR.fragmentSize(fragment1)==10); + + // decoder should wait for the final fragment + ByteBuffer buffer = ByteBuffer.allocate(4 + 10); + buffer.put(fragment1); + buffer.flip(); + ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer); + ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode( + Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class), + buf); + assertTrue(channelBuffer == null); + + byte[] fragment2 = new byte[4 + 10]; + fragment2[0] = (byte) (1 << 7); // final fragment + fragment2[1] = 0; + fragment2[2] = 0; + fragment2[3] = (byte) 10; // fragment size = 10 bytes + assertTrue(XDR.isLastFragment(fragment2)); + assertTrue(XDR.fragmentSize(fragment2)==10); + + buffer = ByteBuffer.allocate(4 + 10); + buffer.put(fragment2); + buffer.flip(); + buf = new ByteBufferBackedChannelBuffer(buffer); + channelBuffer = (ChannelBuffer) decoder.decode( + Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class), + buf); + assertTrue(channelBuffer != null); + // Complete frame should have to total size 10+10=20 + assertTrue(channelBuffer.array().length == 20); + } + + @Test + public void testFrames() { + + RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram", + "localhost", port, 100000, 1, 2, 100); + SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1); + tcpServer.run(); + + XDR xdrOut = createGetportMount(); + int bufsize = 2 * 1024 * 1024; + byte[] buffer = new byte[bufsize]; + xdrOut.writeFixedOpaque(buffer); + int requestSize = xdrOut.size(); + + // Send the request to the server + testRequest(xdrOut); + + // Verify the server got the request with right size + assertTrue(requestSize == result.size()); + } + + static void createPortmapXDRheader(XDR xdr_out, int procedure) { + // Make this a method + RpcCall.write(xdr_out, 0, 100000, 2, procedure); + } + + static XDR createGetportMount() { + XDR xdr_out = new XDR(); + createPortmapXDRheader(xdr_out, 3); + xdr_out.writeInt(0); // AUTH_NULL + xdr_out.writeInt(0); // cred len + xdr_out.writeInt(0); // verifier AUTH_NULL + xdr_out.writeInt(0); // verf len + return xdr_out; + } + /* + * static void testGetport() { XDR xdr_out = new XDR(); + * + * createPortmapXDRheader(xdr_out, 3); + * + * xdr_out.writeInt(100003); xdr_out.writeInt(3); xdr_out.writeInt(6); + * xdr_out.writeInt(0); + * + * XDR request2 = new XDR(); + * + * createPortmapXDRheader(xdr_out, 3); request2.writeInt(100003); + * request2.writeInt(3); request2.writeInt(6); request2.writeInt(0); + * + * testRequest(xdr_out); } + * + * static void testDump() { XDR xdr_out = new XDR(); + * createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); } + */ +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java new file mode 100644 index 0000000000..fbbeb0744c --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; +import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor; +import org.apache.hadoop.oncrpc.RpcReply.ReplyState; +import org.junit.Test; + +/** + * Test for {@link RpcAcceptedReply} + */ +public class TestRpcAcceptedReply { + @Test + public void testAcceptState() { + assertEquals(AcceptState.SUCCESS, AcceptState.fromValue(0)); + assertEquals(AcceptState.PROG_UNAVAIL, AcceptState.fromValue(1)); + assertEquals(AcceptState.PROG_MISMATCH, AcceptState.fromValue(2)); + assertEquals(AcceptState.PROC_UNAVAIL, AcceptState.fromValue(3)); + assertEquals(AcceptState.GARBAGE_ARGS, AcceptState.fromValue(4)); + assertEquals(AcceptState.SYSTEM_ERR, AcceptState.fromValue(5)); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testAcceptStateFromInvalidValue() { + AcceptState.fromValue(6); + } + + @Test + public void testConstructor() { + RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]); + RpcAcceptedReply reply = new RpcAcceptedReply(0, RpcMessage.RPC_REPLY, + ReplyState.MSG_ACCEPTED, verifier, AcceptState.SUCCESS); + assertEquals(0, reply.getXid()); + assertEquals(RpcMessage.RPC_REPLY, reply.getMessageType()); + assertEquals(ReplyState.MSG_ACCEPTED, reply.getState()); + assertEquals(verifier, reply.getVerifier()); + assertEquals(AcceptState.SUCCESS, reply.getAcceptState()); + } +} + diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java new file mode 100644 index 0000000000..0b8240bc60 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + +import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor; +import org.junit.Test; + +/** + * Tests for {@link RpcAuthInfo} + */ +public class TestRpcAuthInfo { + @Test + public void testAuthFlavor() { + assertEquals(AuthFlavor.AUTH_NONE, AuthFlavor.fromValue(0)); + assertEquals(AuthFlavor.AUTH_SYS, AuthFlavor.fromValue(1)); + assertEquals(AuthFlavor.AUTH_SHORT, AuthFlavor.fromValue(2)); + assertEquals(AuthFlavor.AUTH_DH, AuthFlavor.fromValue(3)); + assertEquals(AuthFlavor.RPCSEC_GSS, AuthFlavor.fromValue(6)); + } + + @Test(expected=IllegalArgumentException.class) + public void testInvalidAuthFlavor() { + assertEquals(AuthFlavor.AUTH_NONE, AuthFlavor.fromValue(4)); + } + + @Test + public void testConsturctor() { + byte[] body = new byte[0]; + RpcAuthInfo auth = new RpcAuthInfo(AuthFlavor.AUTH_NONE, body); + assertEquals(AuthFlavor.AUTH_NONE, auth.getFlavor()); + assertTrue(Arrays.equals(body, auth.getBody())); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java new file mode 100644 index 0000000000..474a1f7378 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Test for {@link RpcAuthSys} + */ +public class TestRpcAuthSys { + @Test + public void testConstructor() { + RpcAuthSys auth = new RpcAuthSys(0, 1); + assertEquals(0, auth.getUid()); + assertEquals(1, auth.getGid()); + } + + @Test + public void testRead() { + byte[] bytes = {0, 1, 2, 3}; // 4 bytes Stamp + bytes = XDR.append(bytes, XDR.getVariableOpque(new byte[0])); + bytes = XDR.append(bytes, XDR.toBytes(0)); // gid + bytes = XDR.append(bytes, XDR.toBytes(1)); // uid + RpcAuthSys auth = RpcAuthSys.from(bytes); + assertEquals(0, auth.getUid()); + assertEquals(1, auth.getGid()); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java new file mode 100644 index 0000000000..e3ae2edc40 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor; +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +/** + * Tests for {@link RpcCall} + */ +public class TestRpcCall { + + @Test + public void testConstructor() { + RpcAuthInfo credential = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]); + RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]); + int rpcVersion = RpcCall.RPC_VERSION; + int program = 2; + int version = 3; + int procedure = 4; + RpcCall call = new RpcCall(0, RpcMessage.RPC_CALL, rpcVersion, program, version, procedure, credential, verifier); + assertEquals(0, call.getXid()); + assertEquals(RpcMessage.RPC_CALL, call.getMessageType()); + assertEquals(rpcVersion, call.getRpcVersion()); + assertEquals(program, call.getProgram()); + assertEquals(version, call.getVersion()); + assertEquals(procedure, call.getProcedure()); + assertEquals(credential, call.getCredential()); + assertEquals(verifier, call.getVerifier()); + } + + @Test(expected=IllegalArgumentException.class) + public void testInvalidRpcVersion() { + int invalidRpcVersion = 3; + new RpcCall(0, RpcMessage.RPC_CALL, invalidRpcVersion, 2, 3, 4, null, null); + } + + @Test(expected=IllegalArgumentException.class) + public void testInvalidRpcMessageType() { + int invalidMessageType = 3; // Message typ is not RpcMessage.RPC_CALL + new RpcCall(0, invalidMessageType, RpcCall.RPC_VERSION, 2, 3, 4, null, null); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java new file mode 100644 index 0000000000..f605fc2054 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry; +import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest; +import org.junit.Test; + +/** + * Unit tests for {@link RpcCallCache} + */ +public class TestRpcCallCache { + + @Test(expected=IllegalArgumentException.class) + public void testRpcCallCacheConstructorIllegalArgument0(){ + new RpcCallCache("test", 0); + } + + @Test(expected=IllegalArgumentException.class) + public void testRpcCallCacheConstructorIllegalArgumentNegative(){ + new RpcCallCache("test", -1); + } + + @Test + public void testRpcCallCacheConstructor(){ + RpcCallCache cache = new RpcCallCache("test", 100); + assertEquals("test", cache.getProgram()); + } + + @Test + public void testAddRemoveEntries() throws UnknownHostException { + RpcCallCache cache = new RpcCallCache("test", 100); + InetAddress clientIp = InetAddress.getByName("1.1.1.1"); + int xid = 100; + + // Ensure null is returned when there is no entry in the cache + // An entry is added to indicate the request is in progress + CacheEntry e = cache.checkOrAddToCache(clientIp, xid); + assertNull(e); + e = cache.checkOrAddToCache(clientIp, xid); + validateInprogressCacheEntry(e); + + // Set call as completed + XDR response = new XDR(); + cache.callCompleted(clientIp, xid, response); + e = cache.checkOrAddToCache(clientIp, xid); + validateCompletedCacheEntry(e, response); + } + + private void validateInprogressCacheEntry(CacheEntry c) { + assertTrue(c.isInProgress()); + assertFalse(c.isCompleted()); + assertNull(c.getResponse()); + } + + private void validateCompletedCacheEntry(CacheEntry c, XDR response) { + assertFalse(c.isInProgress()); + assertTrue(c.isCompleted()); + assertEquals(response, c.getResponse()); + } + + @Test + public void testCacheEntry() { + CacheEntry c = new CacheEntry(); + validateInprogressCacheEntry(c); + assertTrue(c.isInProgress()); + assertFalse(c.isCompleted()); + assertNull(c.getResponse()); + + XDR response = new XDR(); + c.setResponse(response); + validateCompletedCacheEntry(c, response); + } + + @Test + public void testCacheFunctionality() throws UnknownHostException { + RpcCallCache cache = new RpcCallCache("Test", 10); + + // Add 20 entries to the cache and only last 10 should be retained + int size = 0; + for (int clientId = 0; clientId < 20; clientId++) { + InetAddress clientIp = InetAddress.getByName("1.1.1."+clientId); + System.out.println("Adding " + clientIp); + cache.checkOrAddToCache(clientIp, 0); + size = Math.min(++size, 10); + System.out.println("Cache size " + cache.size()); + assertEquals(size, cache.size()); // Ensure the cache size is correct + + // Ensure the cache entries are correct + int startEntry = Math.max(clientId - 10 + 1, 0); + Iterator> iterator = cache.iterator(); + for (int i = 0; i < size; i++) { + ClientRequest key = iterator.next().getKey(); + System.out.println("Entry " + key.getClientId()); + assertEquals(InetAddress.getByName("1.1.1." + (startEntry + i)), + key.getClientId()); + } + + // Ensure cache entries are returned as in progress. + for (int i = 0; i < size; i++) { + CacheEntry e = cache.checkOrAddToCache( + InetAddress.getByName("1.1.1." + (startEntry + i)), 0); + assertNotNull(e); + assertTrue(e.isInProgress()); + assertFalse(e.isCompleted()); + } + } + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java new file mode 100644 index 0000000000..669ec9ad5f --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import org.apache.hadoop.oncrpc.RpcDeniedReply.RejectState; +import org.apache.hadoop.oncrpc.RpcReply.ReplyState; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link RpcDeniedReply} + */ +public class TestRpcDeniedReply { + @Test + public void testRejectStateFromValue() { + Assert.assertEquals(RejectState.RPC_MISMATCH, RejectState.fromValue(0)); + Assert.assertEquals(RejectState.AUTH_ERROR, RejectState.fromValue(1)); + } + + @Test(expected=IndexOutOfBoundsException.class) + public void testRejectStateFromInvalidValue1() { + RejectState.fromValue(2); + } + + @Test + public void testConstructor() { + RpcDeniedReply reply = new RpcDeniedReply(0, RpcMessage.RPC_REPLY, + ReplyState.MSG_ACCEPTED, RejectState.AUTH_ERROR) { + // Anonymous class + }; + Assert.assertEquals(0, reply.getXid()); + Assert.assertEquals(RpcMessage.RPC_REPLY, reply.getMessageType()); + Assert.assertEquals(ReplyState.MSG_ACCEPTED, reply.getState()); + Assert.assertEquals(RejectState.AUTH_ERROR, reply.getRejectState()); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java new file mode 100644 index 0000000000..893df7786d --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link RpcMessage} + */ +public class TestRpcMessage { + private RpcMessage getRpcMessage(int xid, int msgType) { + return new RpcMessage(xid, msgType) { + // Anonymous class + }; + } + + @Test(expected=IllegalArgumentException.class) + public void testInvalidMessageType() { + int invalidMsgType = 2; // valid values are 0 and 1 + getRpcMessage(0, invalidMsgType); + } + + @Test + public void testRpcMessage() { + RpcMessage msg = getRpcMessage(0, RpcMessage.RPC_CALL); + Assert.assertEquals(0, msg.getXid()); + Assert.assertEquals(RpcMessage.RPC_CALL, msg.getMessageType()); + } + + @Test + public void testValidateMessage() { + RpcMessage msg = getRpcMessage(0, RpcMessage.RPC_CALL); + msg.validateMessageType(RpcMessage.RPC_CALL); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateMessageException() { + RpcMessage msg = getRpcMessage(0, RpcMessage.RPC_CALL); + msg.validateMessageType(RpcMessage.RPC_REPLY); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java new file mode 100644 index 0000000000..21d38a4760 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + + +import org.apache.hadoop.oncrpc.RpcReply.ReplyState; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link RpcReply} + */ +public class TestRpcReply { + @Test + public void testReplyStateFromValue() { + Assert.assertEquals(ReplyState.MSG_ACCEPTED, ReplyState.fromValue(0)); + Assert.assertEquals(ReplyState.MSG_DENIED, ReplyState.fromValue(1)); + } + + @Test(expected=IndexOutOfBoundsException.class) + public void testReplyStateFromInvalidValue1() { + ReplyState.fromValue(2); + } + + @Test + public void testRpcReply() { + RpcReply reply = new RpcReply(0, 1, ReplyState.MSG_ACCEPTED) { + // Anonymous class + }; + Assert.assertEquals(0, reply.getXid()); + Assert.assertEquals(1, reply.getMessageType()); + Assert.assertEquals(ReplyState.MSG_ACCEPTED, reply.getState()); + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java new file mode 100644 index 0000000000..1745a06bed --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + +import org.junit.Test; + +/** + * Tests for {@link XDR} + */ +public class TestXDR { + /** + * Test {@link XDR#append(byte[], byte[])} + */ + @Test + public void testAppendBytes() { + byte[] arr1 = new byte[] {0, 1}; + byte[] arr2 = new byte[] {2, 3}; + assertTrue(Arrays.equals(new byte[]{0, 1, 2, 3}, XDR.append(arr1, arr2))); + } +} diff --git a/hadoop-common-project/pom.xml b/hadoop-common-project/pom.xml index a09c29b67a..d6b133ab1c 100644 --- a/hadoop-common-project/pom.xml +++ b/hadoop-common-project/pom.xml @@ -35,6 +35,7 @@ hadoop-auth-examples hadoop-common hadoop-annotations + hadoop-nfs