From 4a1acfc96fb7d418ff3fe538a3942834948f6d1c Mon Sep 17 00:00:00 2001 From: Brandon Li Date: Tue, 26 Nov 2013 18:13:04 +0000 Subject: [PATCH] HDFS-5548. Use ConcurrentHashMap in portmap. Contributed by Haohui Mai git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1545756 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/portmap/PortmapInterface.java | 95 --------------- .../apache/hadoop/portmap/PortmapRequest.java | 3 +- .../hadoop/portmap/RpcProgramPortmap.java | 108 +++++++++++------- .../apache/hadoop/portmap/TestPortmap.java | 6 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + 5 files changed, 72 insertions(+), 142 deletions(-) delete mode 100644 hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java deleted file mode 100644 index ae968cb046..0000000000 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.portmap; - -import org.apache.hadoop.oncrpc.XDR; - -/** - * Methods that need to be implemented to provide Portmap RPC program. - * See RFC 1833 for details. - */ -public interface PortmapInterface { - public enum Procedure { - // the order of the values below are significant. - PMAPPROC_NULL, - PMAPPROC_SET, - PMAPPROC_UNSET, - PMAPPROC_GETPORT, - PMAPPROC_DUMP, - PMAPPROC_CALLIT, - PMAPPROC_GETTIME, - PMAPPROC_UADDR2TADDR, - PMAPPROC_TADDR2UADDR, - PMAPPROC_GETVERSADDR, - PMAPPROC_INDIRECT, - PMAPPROC_GETADDRLIST, - PMAPPROC_GETSTAT; - - public int getValue() { - return ordinal(); - } - - public static Procedure fromValue(int value) { - if (value < 0 || value >= values().length) { - return null; - } - return values()[value]; - } - } - - /** - * This procedure does no work. By convention, procedure zero of any protocol - * takes no parameters and returns no results. - */ - public XDR nullOp(int xidd, XDR in, XDR out); - - /** - * When a program first becomes available on a machine, it registers itself - * with the port mapper program on the same machine. The program passes its - * program number "prog", version number "vers", transport protocol number - * "prot", and the port "port" on which it awaits service request. The - * procedure returns a boolean reply whose value is "TRUE" if the procedure - * successfully established the mapping and "FALSE" otherwise. The procedure - * refuses to establish a mapping if one already exists for the tuple - * "(prog, vers, prot)". - */ - public XDR set(int xid, XDR in, XDR out); - - /** - * When a program becomes unavailable, it should unregister itself with the - * port mapper program on the same machine. The parameters and results have - * meanings identical to those of "PMAPPROC_SET". The protocol and port number - * fields of the argument are ignored. - */ - public XDR unset(int xid, XDR in, XDR out); - - /** - * Given a program number "prog", version number "vers", and transport - * protocol number "prot", this procedure returns the port number on which the - * program is awaiting call requests. A port value of zeros means the program - * has not been registered. The "port" field of the argument is ignored. - */ - public XDR getport(int xid, XDR in, XDR out); - - /** - * This procedure enumerates all entries in the port mapper's database. The - * procedure takes no parameters and returns a list of program, version, - * protocol, and port values. - */ - public XDR dump(int xid, XDR in, XDR out); -} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java index 943b4abc5c..2932c78237 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java @@ -22,7 +22,6 @@ import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.VerifierNone; -import org.apache.hadoop.portmap.PortmapInterface.Procedure; /** * Helper utility for building portmap request @@ -37,7 +36,7 @@ public static XDR create(PortmapMapping mapping) { RpcCall call = RpcCall.getInstance( RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)), RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION, - Procedure.PMAPPROC_SET.getValue(), new CredentialsNone(), + RpcProgramPortmap.PMAPPROC_SET, new CredentialsNone(), new VerifierNone()); call.write(request); return mapping.serialize(request); diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java index d68657cc42..67175d0640 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.portmap; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,20 +40,26 @@ import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler; import org.jboss.netty.handler.timeout.IdleStateEvent; -final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler implements PortmapInterface { +final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler { static final int PROGRAM = 100000; static final int VERSION = 2; + + static final int PMAPPROC_NULL = 0; + static final int PMAPPROC_SET = 1; + static final int PMAPPROC_UNSET = 2; + static final int PMAPPROC_GETPORT = 3; + static final int PMAPPROC_DUMP = 4; + static final int PMAPPROC_GETVERSADDR = 9; + private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class); - /** Map synchronized usis monitor lock of this instance */ - private final HashMap map; + private final ConcurrentHashMap map = new ConcurrentHashMap(); /** ChannelGroup that remembers all active channels for gracefully shutdown. */ private final ChannelGroup allChannels; RpcProgramPortmap(ChannelGroup allChannels) { this.allChannels = allChannels; - map = new HashMap(256); PortmapMapping m = new PortmapMapping(PROGRAM, VERSION, PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT); PortmapMapping m1 = new PortmapMapping(PROGRAM, VERSION, @@ -61,48 +67,66 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler imple map.put(PortmapMapping.key(m), m); map.put(PortmapMapping.key(m1), m1); } - - @Override - public XDR nullOp(int xid, XDR in, XDR out) { + + /** + * This procedure does no work. By convention, procedure zero of any protocol + * takes no parameters and returns no results. + */ + private XDR nullOp(int xid, XDR in, XDR out) { return PortmapResponse.voidReply(out, xid); } - @Override - public XDR set(int xid, XDR in, XDR out) { + /** + * When a program first becomes available on a machine, it registers itself + * with the port mapper program on the same machine. The program passes its + * program number "prog", version number "vers", transport protocol number + * "prot", and the port "port" on which it awaits service request. The + * procedure returns a boolean reply whose value is "TRUE" if the procedure + * successfully established the mapping and "FALSE" otherwise. The procedure + * refuses to establish a mapping if one already exists for the tuple + * "(prog, vers, prot)". + */ + private XDR set(int xid, XDR in, XDR out) { PortmapMapping mapping = PortmapRequest.mapping(in); String key = PortmapMapping.key(mapping); if (LOG.isDebugEnabled()) { LOG.debug("Portmap set key=" + key); } - PortmapMapping value = null; - synchronized(this) { - map.put(key, mapping); - value = map.get(key); - } - return PortmapResponse.intReply(out, xid, value.getPort()); + map.put(key, mapping); + return PortmapResponse.intReply(out, xid, mapping.getPort()); } - @Override - public synchronized XDR unset(int xid, XDR in, XDR out) { + /** + * When a program becomes unavailable, it should unregister itself with the + * port mapper program on the same machine. The parameters and results have + * meanings identical to those of "PMAPPROC_SET". The protocol and port number + * fields of the argument are ignored. + */ + private XDR unset(int xid, XDR in, XDR out) { PortmapMapping mapping = PortmapRequest.mapping(in); - synchronized(this) { - map.remove(PortmapMapping.key(mapping)); - } + String key = PortmapMapping.key(mapping); + + if (LOG.isDebugEnabled()) + LOG.debug("Portmap remove key=" + key); + + map.remove(key); return PortmapResponse.booleanReply(out, xid, true); } - @Override - public synchronized XDR getport(int xid, XDR in, XDR out) { + /** + * Given a program number "prog", version number "vers", and transport + * protocol number "prot", this procedure returns the port number on which the + * program is awaiting call requests. A port value of zeros means the program + * has not been registered. The "port" field of the argument is ignored. + */ + private XDR getport(int xid, XDR in, XDR out) { PortmapMapping mapping = PortmapRequest.mapping(in); String key = PortmapMapping.key(mapping); if (LOG.isDebugEnabled()) { LOG.debug("Portmap GETPORT key=" + key + " " + mapping); } - PortmapMapping value = null; - synchronized(this) { - value = map.get(key); - } + PortmapMapping value = map.get(key); int res = 0; if (value != null) { res = value.getPort(); @@ -115,13 +139,13 @@ public synchronized XDR getport(int xid, XDR in, XDR out) { return PortmapResponse.intReply(out, xid, res); } - @Override - public synchronized XDR dump(int xid, XDR in, XDR out) { - PortmapMapping[] pmapList = null; - synchronized(this) { - pmapList = new PortmapMapping[map.values().size()]; - map.values().toArray(pmapList); - } + /** + * This procedure enumerates all entries in the port mapper's database. The + * procedure takes no parameters and returns a list of program, version, + * protocol, and port values. + */ + private XDR dump(int xid, XDR in, XDR out) { + PortmapMapping[] pmapList = map.values().toArray(new PortmapMapping[0]); return PortmapResponse.pmapList(out, xid, pmapList); } @@ -131,23 +155,23 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) RpcInfo info = (RpcInfo) e.getMessage(); RpcCall rpcCall = (RpcCall) info.header(); - final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure()); + final int portmapProc = rpcCall.getProcedure(); int xid = rpcCall.getXid(); XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(), XDR.State.READING); XDR out = new XDR(); - if (portmapProc == Procedure.PMAPPROC_NULL) { + if (portmapProc == PMAPPROC_NULL) { out = nullOp(xid, in, out); - } else if (portmapProc == Procedure.PMAPPROC_SET) { + } else if (portmapProc == PMAPPROC_SET) { out = set(xid, in, out); - } else if (portmapProc == Procedure.PMAPPROC_UNSET) { + } else if (portmapProc == PMAPPROC_UNSET) { out = unset(xid, in, out); - } else if (portmapProc == Procedure.PMAPPROC_DUMP) { + } else if (portmapProc == PMAPPROC_DUMP) { out = dump(xid, in, out); - } else if (portmapProc == Procedure.PMAPPROC_GETPORT) { + } else if (portmapProc == PMAPPROC_GETPORT) { out = getport(xid, in, out); - } else if (portmapProc == Procedure.PMAPPROC_GETVERSADDR) { + } else if (portmapProc == PMAPPROC_GETVERSADDR) { out = getport(xid, in, out); } else { LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc); @@ -161,7 +185,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); RpcUtil.sendRpcResponse(ctx, rsp); } - + @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java index 2ed16bb13e..cc88d34920 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java @@ -23,7 +23,7 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.HashMap; +import java.util.Map; import junit.framework.Assert; @@ -80,7 +80,7 @@ public void testRegistration() throws IOException, InterruptedException { XDR req = new XDR(); RpcCall.getInstance(++xid, RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION, - PortmapInterface.Procedure.PMAPPROC_SET.getValue(), + RpcProgramPortmap.PMAPPROC_SET, new CredentialsNone(), new VerifierNone()).write(req); PortmapMapping sent = new PortmapMapping(90000, 1, @@ -101,7 +101,7 @@ public void testRegistration() throws IOException, InterruptedException { Thread.sleep(100); boolean found = false; @SuppressWarnings("unchecked") - HashMap map = (HashMap) Whitebox + Map map = (Map) Whitebox .getInternalState(pm.getHandler(), "map"); for (PortmapMapping m : map.values()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ac53b602ec..b1d40f495b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -733,6 +733,8 @@ Release 2.2.1 - UNRELEASED HDFS-5407. Fix typos in DFSClientCache (Haohui Mai via brandonli) + HDFS-5548. Use ConcurrentHashMap in portmap (Haohui Mai via brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES