From 635786a511344b53b1d3f25c2f29ab5298f6ac74 Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Tue, 23 Oct 2018 14:53:45 -0700 Subject: [PATCH] HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang. --- .../java/org/apache/hadoop/ipc/Server.java | 114 ++++++++++++++++-- .../security/SaslPropertiesResolver.java | 4 +- .../java/org/apache/hadoop/ipc/TestIPC.java | 53 +++++++- .../org/apache/hadoop/hdfs/DFSUtilClient.java | 85 ++++++++++++- .../hdfs/client/HdfsClientConfigKeys.java | 5 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../hadoop/hdfs/server/namenode/NameNode.java | 30 +++++ .../server/namenode/NameNodeRpcServer.java | 16 ++- .../src/main/resources/hdfs-default.xml | 11 ++ .../apache/hadoop/hdfs/MiniDFSCluster.java | 51 ++++++++ .../hadoop/hdfs/TestHAAuxiliaryPort.java | 112 +++++++++++++++++ 11 files changed, 468 insertions(+), 16 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index d162d2f887..b0ab85c7b5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -366,6 +366,24 @@ public static InetAddress getRemoteIp() { return (call != null ) ? call.getHostInetAddress() : null; } + /** + * Returns the SASL qop for the current call, if the current call is + * set, and the SASL negotiation is done. Otherwise return null. Note + * that CurCall is thread local object. So in fact, different handler + * threads will process different CurCall object. + * + * Also, only return for RPC calls, not supported for other protocols. + * @return the QOP of the current connection. + */ + public static String getEstablishedQOP() { + Call call = CurCall.get(); + if (call == null || !(call instanceof RpcCall)) { + return null; + } + RpcCall rpcCall = (RpcCall)call; + return rpcCall.connection.getEstablishedQOP(); + } + /** * Returns the clientId from the current RPC request */ @@ -445,6 +463,10 @@ protected ResponseBuffer initialValue() { // maintains the set of client connections and handles idle timeouts private ConnectionManager connectionManager; private Listener listener = null; + // Auxiliary listeners maintained as in a map, to allow + // arbitrary number of of auxiliary listeners. A map from + // the port to the listener binding to it. + private Map auxiliaryListenerMap; private Responder responder = null; private Handler[] handlers = null; @@ -1028,11 +1050,12 @@ private class Listener extends Thread { private Reader[] readers = null; private int currentReader = 0; private InetSocketAddress address; //the address we bind at + private int listenPort; //the port we bind at private int backlogLength = conf.getInt( CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); - public Listener() throws IOException { + Listener(int port) throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); @@ -1040,7 +1063,10 @@ public Listener() throws IOException { // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); - port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port + //Could be an ephemeral port + this.listenPort = acceptChannel.socket().getLocalPort(); + Thread.currentThread().setName("Listener at " + + bindAddress + "/" + this.listenPort); // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; @@ -1223,7 +1249,7 @@ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOf channel.socket().setKeepAlive(true); Reader reader = getReader(); - Connection c = connectionManager.register(channel); + Connection c = connectionManager.register(channel, this.listenPort); // If the connectionManager can't take it, close the connection. if (c == null) { if (channel.isOpen()) { @@ -1643,6 +1669,7 @@ public class Connection { private ByteBuffer unwrappedDataLengthBuffer; private int serviceClass; private boolean shouldClose = false; + private int ingressPort; UserGroupInformation user = null; public UserGroupInformation attemptingUser = null; // user name before auth @@ -1654,7 +1681,8 @@ public class Connection { private boolean sentNegotiate = false; private boolean useWrap = false; - public Connection(SocketChannel channel, long lastContact) { + public Connection(SocketChannel channel, long lastContact, + int ingressPort) { this.channel = channel; this.lastContact = lastContact; this.data = null; @@ -1666,6 +1694,7 @@ public Connection(SocketChannel channel, long lastContact) { this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4); this.socket = channel.socket(); this.addr = socket.getInetAddress(); + this.ingressPort = ingressPort; if (addr == null) { this.hostAddress = "*Unknown*"; } else { @@ -1700,9 +1729,24 @@ public String getHostAddress() { return hostAddress; } + public int getIngressPort() { + return ingressPort; + } + public InetAddress getHostInetAddress() { return addr; } + + public String getEstablishedQOP() { + // In practice, saslServer should not be null when this is + // called. If it is null, it must be either some + // configuration mistake or it is called from unit test. + if (saslServer == null) { + LOG.warn("SASL server should not be null!"); + return null; + } + return (String)saslServer.getNegotiatedProperty(Sasl.QOP); + } public void setLastContact(long lastContact) { this.lastContact = lastContact; @@ -2175,7 +2219,7 @@ private RpcSaslProto buildSaslNegotiateResponse() private SaslServer createSaslServer(AuthMethod authMethod) throws IOException, InterruptedException { final Map saslProps = - saslPropsResolver.getServerProperties(addr); + saslPropsResolver.getServerProperties(addr, ingressPort); return new SaslRpcServer(authMethod).create(this, saslProps, secretManager); } @@ -2655,7 +2699,8 @@ private void internalQueueCall(Call call) private class Handler extends Thread { public Handler(int instanceNumber) { this.setDaemon(true); - this.setName("IPC Server handler "+ instanceNumber + " on " + port); + this.setName("IPC Server handler "+ instanceNumber + + " on default port " + port); } @Override @@ -2773,6 +2818,7 @@ protected Server(String bindAddress, int port, this.handlerCount = handlerCount; this.socketSendBufferSize = 0; this.serverName = serverName; + this.auxiliaryListenerMap = null; this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); if (queueSizePerHandler != -1) { @@ -2812,8 +2858,9 @@ protected Server(String bindAddress, int port, this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods); // Start the listener here and let it bind to the port - listener = new Listener(); - this.port = listener.getAddress().getPort(); + listener = new Listener(port); + // set the server port to the default listener port. + this.port = listener.getAddress().getPort(); connectionManager = new ConnectionManager(); this.rpcMetrics = RpcMetrics.create(this, conf); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); @@ -2835,7 +2882,23 @@ protected Server(String bindAddress, int port, this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class); } - + + public synchronized void addAuxiliaryListener(int auxiliaryPort) + throws IOException { + if (auxiliaryListenerMap == null) { + auxiliaryListenerMap = new HashMap<>(); + } + if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) { + throw new IOException( + "There is already a listener binding to: " + auxiliaryPort); + } + Listener newListener = new Listener(auxiliaryPort); + // in the case of port = 0, the listener would be on a != 0 port. + LOG.info("Adding a server listener on port " + + newListener.getAddress().getPort()); + auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener); + } + private RpcSaslProto buildNegotiateResponse(List authMethods) throws IOException { RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder(); @@ -3069,6 +3132,12 @@ public void setTracer(Tracer t) { public synchronized void start() { responder.start(); listener.start(); + if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { + for (Listener newListener : auxiliaryListenerMap.values()) { + newListener.start(); + } + } + handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { @@ -3090,6 +3159,12 @@ public synchronized void stop() { } listener.interrupt(); listener.doStop(); + if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { + for (Listener newListener : auxiliaryListenerMap.values()) { + newListener.interrupt(); + newListener.doStop(); + } + } responder.interrupt(); notifyAll(); this.rpcMetrics.shutdown(); @@ -3113,6 +3188,23 @@ public synchronized void join() throws InterruptedException { public synchronized InetSocketAddress getListenerAddress() { return listener.getAddress(); } + + /** + * Return the set of all the configured auxiliary socket addresses NameNode + * RPC is listening on. If there are none, or it is not configured at all, an + * empty set is returned. + * @return the set of all the auxiliary addresses on which the + * RPC server is listening on. + */ + public synchronized Set getAuxiliaryListenerAddresses() { + Set allAddrs = new HashSet<>(); + if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { + for (Listener auxListener : auxiliaryListenerMap.values()) { + allAddrs.add(auxListener.getAddress()); + } + } + return allAddrs; + } /** * Called for each call. @@ -3417,11 +3509,11 @@ Connection[] toArray() { return connections.toArray(new Connection[0]); } - Connection register(SocketChannel channel) { + Connection register(SocketChannel channel, int ingressPort) { if (isFull()) { return null; } - Connection connection = new Connection(channel, Time.now()); + Connection connection = new Connection(channel, Time.now(), ingressPort); add(connection); if (LOG.isDebugEnabled()) { LOG.debug("Server connection from " + connection + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java index 64b86e3f27..dd6c42e149 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java @@ -102,7 +102,7 @@ public Map getServerProperties(InetAddress clientAddress){ */ public Map getServerProperties(InetAddress clientAddress, int ingressPort){ - return properties; + return getServerProperties(clientAddress); } /** @@ -122,7 +122,7 @@ public Map getClientProperties(InetAddress serverAddress){ */ public Map getClientProperties(InetAddress serverAddress, int ingressPort) { - return properties; + return getClientProperties(serverAddress); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 19314c1d50..7ba4e53af8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -49,8 +49,10 @@ import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -171,6 +173,11 @@ public TestServer(int handlerCount, boolean sleep) throws IOException { this(handlerCount, sleep, LongWritable.class, null); } + public TestServer(int port, int handlerCount, boolean sleep) + throws IOException { + this(port, handlerCount, sleep, LongWritable.class, null); + } + public TestServer(int handlerCount, boolean sleep, Configuration conf) throws IOException { this(handlerCount, sleep, LongWritable.class, null, conf); @@ -182,11 +189,24 @@ public TestServer(int handlerCount, boolean sleep, this(handlerCount, sleep, paramClass, responseClass, conf); } + public TestServer(int port, int handlerCount, boolean sleep, + Class paramClass, + Class responseClass) throws IOException { + this(port, handlerCount, sleep, paramClass, responseClass, conf); + } + public TestServer(int handlerCount, boolean sleep, Class paramClass, Class responseClass, Configuration conf) throws IOException { - super(ADDRESS, 0, paramClass, handlerCount, conf); + this(0, handlerCount, sleep, paramClass, responseClass, conf); + } + + public TestServer(int port, int handlerCount, boolean sleep, + Class paramClass, + Class responseClass, Configuration conf) + throws IOException { + super(ADDRESS, port, paramClass, handlerCount, conf); this.sleep = sleep; this.responseClass = responseClass; } @@ -338,6 +358,37 @@ public void internalTestSerial(int handlerCount, boolean handlerSleep, } server.stop(); } + + @Test + public void testAuxiliaryPorts() throws IOException, InterruptedException { + int defaultPort = 9000; + int[] auxiliaryPorts = {9001, 9002, 9003}; + final int handlerCount = 5; + final boolean handlerSleep = false; + Server server = new TestServer(defaultPort, handlerCount, handlerSleep); + for (int port : auxiliaryPorts) { + server.addAuxiliaryListener(port); + } + Set listenerAddrs = + server.getAuxiliaryListenerAddresses(); + Set addrs = new HashSet<>(); + for (InetSocketAddress addr : listenerAddrs) { + addrs.add(NetUtils.getConnectAddress(addr)); + } + server.start(); + + Client client = new Client(LongWritable.class, conf); + Set calls = new HashSet<>(); + for (InetSocketAddress addr : addrs) { + calls.add(new SerialCaller(client, addr, 100)); + } + for (SerialCaller caller : calls) { + caller.join(); + assertFalse(caller.failed); + } + client.stop(); + server.stop(); + } @Test(timeout=60000) public void testStandAloneClient() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 0686ed5b1f..0acccea097 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.primitives.SignedBytes; +import java.net.URISyntaxException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -92,6 +93,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; @InterfaceAudience.Private @@ -432,7 +435,7 @@ public static Map getAddressesForNameserviceId( Map ret = Maps.newLinkedHashMap(); for (String nnId : emptyAsSingletonNull(nnIds)) { String suffix = concatSuffixes(nsId, nnId); - String address = getConfValue(defaultValue, suffix, conf, keys); + String address = checkKeysAndProcess(defaultValue, suffix, conf, keys); if (address != null) { InetSocketAddress isa = NetUtils.createSocketAddr(address); if (isa.isUnresolved()) { @@ -446,6 +449,86 @@ public static Map getAddressesForNameserviceId( return ret; } + /** + * Return address from configuration. Take a list of keys as preference. + * If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY, + * will check to see if auxiliary ports are enabled. If so, call to replace + * address port with auxiliary port. If the address is not the value of + * DFS_NAMENODE_RPC_ADDRESS_KEY, return the original address. If failed to + * find any address, return the given default value. + * + * @param defaultValue the default value if no values found for given keys + * @param suffix suffix to append to keys + * @param conf the configuration + * @param keys a list of keys, ordered by preference + * @return + */ + private static String checkKeysAndProcess(String defaultValue, String suffix, + Configuration conf, String... keys) { + String succeededKey = null; + String address = null; + for (String key : keys) { + address = getConfValue(null, suffix, conf, key); + if (address != null) { + succeededKey = key; + break; + } + } + String ret; + if (address == null) { + ret = defaultValue; + } else if(DFS_NAMENODE_RPC_ADDRESS_KEY.equals(succeededKey)) { + ret = checkRpcAuxiliary(conf, suffix, address); + } else { + ret = address; + } + return ret; + } + + /** + * Check if auxiliary port is enabled, if yes, check if the given address + * should have its port replaced by an auxiliary port. If the given address + * does not contain a port, append the auxiliary port to enforce using it. + * + * @param conf configuration. + * @param address the address to check and modify (if needed). + * @return the new modified address containing auxiliary port, or original + * address if auxiliary port not enabled. + */ + private static String checkRpcAuxiliary(Configuration conf, String suffix, + String address) { + String key = DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; + key = addSuffix(key, suffix); + int[] ports = conf.getInts(key); + if (ports == null || ports.length == 0) { + return address; + } + LOG.info("Using server auxiliary ports " + Arrays.toString(ports)); + URI uri; + try { + uri = new URI(address); + } catch (URISyntaxException e) { + // return the original address untouched if it is not a valid URI. This + // happens in unit test, as MiniDFSCluster sets the value to + // 127.0.0.1:0, without schema (i.e. "hdfs://"). While in practice, this + // should not be the case. So log a warning message here. + LOG.warn("NameNode address is not a valid uri:" + address); + return address; + } + // Ignore the port, only take the schema(e.g. hdfs) and host (e.g. + // localhost), then append port + // TODO : revisit if there is a better way + StringBuilder sb = new StringBuilder(); + sb.append(uri.getScheme()); + sb.append("://"); + sb.append(uri.getHost()); + sb.append(":"); + // TODO : currently, only the very first auxiliary port is being used. + // But actually NN supports running multiple auxiliary + sb.append(ports[0]); + return sb.toString(); + } + /** * Given a list of keys in the order of preference, returns a value * for the key in the given order from the configuration. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index b1ce78d304..9d20933705 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -68,6 +68,11 @@ public interface HdfsClientConfigKeys { String PREFIX = "dfs.client."; String DFS_NAMESERVICES = "dfs.nameservices"; String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; + + String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports"; + String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY + + "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX; + int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870; String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 42709de092..439fbae67a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1267,6 +1267,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final Class DFS_NET_TOPOLOGY_IMPL_DEFAULT = DFSNetworkTopology.class; + public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index a8034da85e..9f82cbd3de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -1048,6 +1049,14 @@ public InetSocketAddress getNameNodeAddress() { return rpcServer.getRpcAddress(); } + /** + * @return The auxiliary nameNode RPC addresses, or empty set if there + * is none. + */ + public Set getAuxiliaryNameNodeAddresses() { + return rpcServer.getAuxiliaryRpcAddresses(); + } + /** * @return NameNode RPC address in "host:port" string form */ @@ -1055,6 +1064,27 @@ public String getNameNodeAddressHostPortString() { return NetUtils.getHostPortString(getNameNodeAddress()); } + /** + * Return a host:port format string corresponds to an auxiliary + * port configured on NameNode. If there are multiple auxiliary ports, + * an arbitrary one is returned. If there is no auxiliary listener, returns + * null. + * + * @return a string of format host:port that points to an auxiliary NameNode + * address, or null if there is no such address. + */ + @VisibleForTesting + public String getNNAuxiliaryRpcAddress() { + Set auxiliaryAddrs = getAuxiliaryNameNodeAddresses(); + if (auxiliaryAddrs.isEmpty()) { + return null; + } + // since set has no particular order, returning the first element of + // from the iterator is effectively arbitrary. + InetSocketAddress addr = auxiliaryAddrs.iterator().next(); + return NetUtils.getHostPortString(addr); + } + /** * @return NameNode service RPC address if configured, the * NameNode RPC address otherwise diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index ec5ce9d174..0bef4cce98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -26,6 +26,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH; import static org.apache.hadoop.util.Time.now; @@ -537,6 +538,13 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) if (lifelineRpcServer != null) { lifelineRpcServer.setTracer(nn.tracer); } + int[] auxiliaryPorts = + conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY); + if (auxiliaryPorts != null && auxiliaryPorts.length != 0) { + for (int auxiliaryPort : auxiliaryPorts) { + this.clientRpcServer.addAuxiliaryListener(auxiliaryPort); + } + } } /** Allow access to the lifeline RPC server for testing */ @@ -606,10 +614,16 @@ InetSocketAddress getServiceRpcAddress() { return serviceRPCAddress; } - InetSocketAddress getRpcAddress() { + @VisibleForTesting + public InetSocketAddress getRpcAddress() { return clientRpcAddress; } + @VisibleForTesting + public Set getAuxiliaryRpcAddresses() { + return clientRpcServer.getAuxiliaryListenerAddresses(); + } + private static UserGroupInformation getRemoteUser() throws IOException { return NameNode.getRemoteUser(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b894abb600..7c72af0a18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5220,4 +5220,15 @@ ensure that other waiters on the lock can get in. + + + dfs.namenode.rpc-address.auxiliary-ports + + + A comma separated list of auxiliary ports for the NameNode to listen on. + This allows exposing multiple NN addresses to clients. + Particularly, it is used to enforce different SASL levels on different ports. + Empty list indicates that auxiliary ports are disabled. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 29807ec4e0..a579c56912 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1362,6 +1362,21 @@ public URI getURI(int nnIndex) { } return uri; } + + URI getURIForAuxiliaryPort(int nnIndex) { + String hostPort = + getNN(nnIndex).nameNode.getNNAuxiliaryRpcAddress(); + if (hostPort == null) { + throw new RuntimeException("No auxiliary port found"); + } + URI uri = null; + try { + uri = new URI("hdfs://" + hostPort); + } catch (URISyntaxException e) { + NameNode.LOG.warn("unexpected URISyntaxException", e); + } + return uri; + } public int getInstanceId() { return instanceId; @@ -1973,6 +1988,14 @@ public int getNameNodePort() { checkSingleNameNode(); return getNameNodePort(0); } + + /** + * Get the auxiliary port of NameNode, NameNode specified by index. + */ + public int getNameNodeAuxiliaryPort() { + checkSingleNameNode(); + return getNameNodeAuxiliaryPort(0); + } /** * Gets the rpc port used by the NameNode at the given index, because the @@ -1982,6 +2005,22 @@ public int getNameNodePort(int nnIndex) { return getNN(nnIndex).nameNode.getNameNodeAddress().getPort(); } + /** + * Gets the rpc port used by the NameNode at the given index, if the + * NameNode has multiple auxiliary ports configured, a arbitrary + * one is returned. + */ + public int getNameNodeAuxiliaryPort(int nnIndex) { + Set allAuxiliaryAddresses = + getNN(nnIndex).nameNode.getAuxiliaryNameNodeAddresses(); + if (allAuxiliaryAddresses.isEmpty()) { + return -1; + } else { + InetSocketAddress addr = allAuxiliaryAddresses.iterator().next(); + return addr.getPort(); + } + } + /** * @return the service rpc port used by the NameNode at the given index. */ @@ -2538,6 +2577,12 @@ public DistributedFileSystem getFileSystem() throws IOException { return getFileSystem(0); } + public DistributedFileSystem getFileSystemFromAuxiliaryPort() + throws IOException { + checkSingleNameNode(); + return getFileSystemFromAuxiliaryPort(0); + } + /** * Get a client handle to the DFS cluster for the namenode at given index. */ @@ -2546,6 +2591,12 @@ public DistributedFileSystem getFileSystem(int nnIndex) throws IOException { getNN(nnIndex).conf)); } + public DistributedFileSystem getFileSystemFromAuxiliaryPort(int nnIndex) + throws IOException { + return (DistributedFileSystem) addFileSystem(FileSystem.get( + getURIForAuxiliaryPort(nnIndex), getNN(nnIndex).conf)); + } + /** * Get another FileSystem instance that is different from FileSystem.get(conf). * This simulating different threads working on different FileSystem instances. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java new file mode 100644 index 0000000000..867fbac30c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +/** + * Test NN auxiliary port with HA. + */ +public class TestHAAuxiliaryPort { + @Test + public void testTest() throws Exception { + Configuration conf = new Configuration(); + conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0"); + conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1", + "9000,9001"); + conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn2", + "9000,9001"); + conf.set(DFS_NAMESERVICES, "ha-nn-uri-0"); + conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ha-nn-uri-0", "nn1,nn2"); + conf.setBoolean("fs.hdfs.impl.disable.cache", true); + + MiniDFSNNTopology topology = new MiniDFSNNTopology() + .addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0") + .addNN(new MiniDFSNNTopology.NNConf("nn1")) + .addNN(new MiniDFSNNTopology.NNConf("nn2"))); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(topology) + .numDataNodes(0) + .build(); + cluster.transitionToActive(0); + cluster.waitActive(); + + NameNode nn0 = cluster.getNameNode(0); + NameNode nn1 = cluster.getNameNode(1); + + // all the addresses below are valid nn0 addresses + NameNodeRpcServer rpcServer0 = (NameNodeRpcServer)nn0.getRpcServer(); + InetSocketAddress server0RpcAddress = rpcServer0.getRpcAddress(); + Set auxAddrServer0 = + rpcServer0.getAuxiliaryRpcAddresses(); + assertEquals(2, auxAddrServer0.size()); + + // all the addresses below are valid nn1 addresses + NameNodeRpcServer rpcServer1 = (NameNodeRpcServer)nn1.getRpcServer(); + InetSocketAddress server1RpcAddress = rpcServer1.getRpcAddress(); + Set auxAddrServer1 = + rpcServer1.getAuxiliaryRpcAddresses(); + assertEquals(2, auxAddrServer1.size()); + + // mkdir on nn0 uri 0 + URI nn0URI = new URI("hdfs://localhost:" + + server0RpcAddress.getPort()); + try (DFSClient client0 = new DFSClient(nn0URI, conf)){ + client0.mkdirs("/test", null, true); + // should be available on other ports also + for (InetSocketAddress auxAddr : auxAddrServer0) { + nn0URI = new URI("hdfs://localhost:" + auxAddr.getPort()); + try (DFSClient clientTmp = new DFSClient(nn0URI, conf)) { + assertTrue(clientTmp.exists("/test")); + } + } + } + + // now perform a failover + cluster.shutdownNameNode(0); + cluster.transitionToActive(1); + + // then try to read the file from the nn1 + URI nn1URI = new URI("hdfs://localhost:" + + server1RpcAddress.getPort()); + try (DFSClient client1 = new DFSClient(nn1URI, conf)) { + assertTrue(client1.exists("/test")); + // should be available on other ports also + for (InetSocketAddress auxAddr : auxAddrServer1) { + nn1URI = new URI("hdfs://localhost:" + auxAddr.getPort()); + try (DFSClient clientTmp = new DFSClient(nn1URI, conf)) { + assertTrue(client1.exists("/test")); + } + } + } + } +}