HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2018-10-23 14:53:45 -07:00
parent efdfe679d6
commit 635786a511
11 changed files with 468 additions and 16 deletions

View File

@ -366,6 +366,24 @@ public static InetAddress getRemoteIp() {
return (call != null ) ? call.getHostInetAddress() : null;
}
/**
* Returns the SASL qop for the current call, if the current call is
* set, and the SASL negotiation is done. Otherwise return null. Note
* that CurCall is thread local object. So in fact, different handler
* threads will process different CurCall object.
*
* Also, only return for RPC calls, not supported for other protocols.
* @return the QOP of the current connection.
*/
public static String getEstablishedQOP() {
Call call = CurCall.get();
if (call == null || !(call instanceof RpcCall)) {
return null;
}
RpcCall rpcCall = (RpcCall)call;
return rpcCall.connection.getEstablishedQOP();
}
/**
* Returns the clientId from the current RPC request
*/
@ -445,6 +463,10 @@ protected ResponseBuffer initialValue() {
// maintains the set of client connections and handles idle timeouts
private ConnectionManager connectionManager;
private Listener listener = null;
// Auxiliary listeners maintained as in a map, to allow
// arbitrary number of of auxiliary listeners. A map from
// the port to the listener binding to it.
private Map<Integer, Listener> auxiliaryListenerMap;
private Responder responder = null;
private Handler[] handlers = null;
@ -1028,11 +1050,12 @@ private class Listener extends Thread {
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address; //the address we bind at
private int listenPort; //the port we bind at
private int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
public Listener() throws IOException {
Listener(int port) throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
@ -1040,7 +1063,10 @@ public Listener() throws IOException {
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
//Could be an ephemeral port
this.listenPort = acceptChannel.socket().getLocalPort();
Thread.currentThread().setName("Listener at " +
bindAddress + "/" + this.listenPort);
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
@ -1223,7 +1249,7 @@ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOf
channel.socket().setKeepAlive(true);
Reader reader = getReader();
Connection c = connectionManager.register(channel);
Connection c = connectionManager.register(channel, this.listenPort);
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
@ -1643,6 +1669,7 @@ public class Connection {
private ByteBuffer unwrappedDataLengthBuffer;
private int serviceClass;
private boolean shouldClose = false;
private int ingressPort;
UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth
@ -1654,7 +1681,8 @@ public class Connection {
private boolean sentNegotiate = false;
private boolean useWrap = false;
public Connection(SocketChannel channel, long lastContact) {
public Connection(SocketChannel channel, long lastContact,
int ingressPort) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
@ -1666,6 +1694,7 @@ public Connection(SocketChannel channel, long lastContact) {
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
this.addr = socket.getInetAddress();
this.ingressPort = ingressPort;
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
@ -1700,9 +1729,24 @@ public String getHostAddress() {
return hostAddress;
}
public int getIngressPort() {
return ingressPort;
}
public InetAddress getHostInetAddress() {
return addr;
}
public String getEstablishedQOP() {
// In practice, saslServer should not be null when this is
// called. If it is null, it must be either some
// configuration mistake or it is called from unit test.
if (saslServer == null) {
LOG.warn("SASL server should not be null!");
return null;
}
return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
}
public void setLastContact(long lastContact) {
this.lastContact = lastContact;
@ -2175,7 +2219,7 @@ private RpcSaslProto buildSaslNegotiateResponse()
private SaslServer createSaslServer(AuthMethod authMethod)
throws IOException, InterruptedException {
final Map<String,?> saslProps =
saslPropsResolver.getServerProperties(addr);
saslPropsResolver.getServerProperties(addr, ingressPort);
return new SaslRpcServer(authMethod).create(this, saslProps, secretManager);
}
@ -2655,7 +2699,8 @@ private void internalQueueCall(Call call)
private class Handler extends Thread {
public Handler(int instanceNumber) {
this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
this.setName("IPC Server handler "+ instanceNumber +
" on default port " + port);
}
@Override
@ -2773,6 +2818,7 @@ protected Server(String bindAddress, int port,
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
this.serverName = serverName;
this.auxiliaryListenerMap = null;
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (queueSizePerHandler != -1) {
@ -2812,8 +2858,9 @@ protected Server(String bindAddress, int port,
this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
listener = new Listener(port);
// set the server port to the default listener port.
this.port = listener.getAddress().getPort();
connectionManager = new ConnectionManager();
this.rpcMetrics = RpcMetrics.create(this, conf);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
@ -2835,7 +2882,23 @@ protected Server(String bindAddress, int port,
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
}
public synchronized void addAuxiliaryListener(int auxiliaryPort)
throws IOException {
if (auxiliaryListenerMap == null) {
auxiliaryListenerMap = new HashMap<>();
}
if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) {
throw new IOException(
"There is already a listener binding to: " + auxiliaryPort);
}
Listener newListener = new Listener(auxiliaryPort);
// in the case of port = 0, the listener would be on a != 0 port.
LOG.info("Adding a server listener on port " +
newListener.getAddress().getPort());
auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener);
}
private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
throws IOException {
RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
@ -3069,6 +3132,12 @@ public void setTracer(Tracer t) {
public synchronized void start() {
responder.start();
listener.start();
if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
for (Listener newListener : auxiliaryListenerMap.values()) {
newListener.start();
}
}
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
@ -3090,6 +3159,12 @@ public synchronized void stop() {
}
listener.interrupt();
listener.doStop();
if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
for (Listener newListener : auxiliaryListenerMap.values()) {
newListener.interrupt();
newListener.doStop();
}
}
responder.interrupt();
notifyAll();
this.rpcMetrics.shutdown();
@ -3113,6 +3188,23 @@ public synchronized void join() throws InterruptedException {
public synchronized InetSocketAddress getListenerAddress() {
return listener.getAddress();
}
/**
* Return the set of all the configured auxiliary socket addresses NameNode
* RPC is listening on. If there are none, or it is not configured at all, an
* empty set is returned.
* @return the set of all the auxiliary addresses on which the
* RPC server is listening on.
*/
public synchronized Set<InetSocketAddress> getAuxiliaryListenerAddresses() {
Set<InetSocketAddress> allAddrs = new HashSet<>();
if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
for (Listener auxListener : auxiliaryListenerMap.values()) {
allAddrs.add(auxListener.getAddress());
}
}
return allAddrs;
}
/**
* Called for each call.
@ -3417,11 +3509,11 @@ Connection[] toArray() {
return connections.toArray(new Connection[0]);
}
Connection register(SocketChannel channel) {
Connection register(SocketChannel channel, int ingressPort) {
if (isFull()) {
return null;
}
Connection connection = new Connection(channel, Time.now());
Connection connection = new Connection(channel, Time.now(), ingressPort);
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +

View File

@ -102,7 +102,7 @@ public Map<String, String> getServerProperties(InetAddress clientAddress){
*/
public Map<String, String> getServerProperties(InetAddress clientAddress,
int ingressPort){
return properties;
return getServerProperties(clientAddress);
}
/**
@ -122,7 +122,7 @@ public Map<String, String> getClientProperties(InetAddress serverAddress){
*/
public Map<String, String> getClientProperties(InetAddress serverAddress,
int ingressPort) {
return properties;
return getClientProperties(serverAddress);
}
/**

View File

@ -49,8 +49,10 @@
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@ -171,6 +173,11 @@ public TestServer(int handlerCount, boolean sleep) throws IOException {
this(handlerCount, sleep, LongWritable.class, null);
}
public TestServer(int port, int handlerCount, boolean sleep)
throws IOException {
this(port, handlerCount, sleep, LongWritable.class, null);
}
public TestServer(int handlerCount, boolean sleep, Configuration conf)
throws IOException {
this(handlerCount, sleep, LongWritable.class, null, conf);
@ -182,11 +189,24 @@ public TestServer(int handlerCount, boolean sleep,
this(handlerCount, sleep, paramClass, responseClass, conf);
}
public TestServer(int port, int handlerCount, boolean sleep,
Class<? extends Writable> paramClass,
Class<? extends Writable> responseClass) throws IOException {
this(port, handlerCount, sleep, paramClass, responseClass, conf);
}
public TestServer(int handlerCount, boolean sleep,
Class<? extends Writable> paramClass,
Class<? extends Writable> responseClass, Configuration conf)
throws IOException {
super(ADDRESS, 0, paramClass, handlerCount, conf);
this(0, handlerCount, sleep, paramClass, responseClass, conf);
}
public TestServer(int port, int handlerCount, boolean sleep,
Class<? extends Writable> paramClass,
Class<? extends Writable> responseClass, Configuration conf)
throws IOException {
super(ADDRESS, port, paramClass, handlerCount, conf);
this.sleep = sleep;
this.responseClass = responseClass;
}
@ -338,6 +358,37 @@ public void internalTestSerial(int handlerCount, boolean handlerSleep,
}
server.stop();
}
@Test
public void testAuxiliaryPorts() throws IOException, InterruptedException {
int defaultPort = 9000;
int[] auxiliaryPorts = {9001, 9002, 9003};
final int handlerCount = 5;
final boolean handlerSleep = false;
Server server = new TestServer(defaultPort, handlerCount, handlerSleep);
for (int port : auxiliaryPorts) {
server.addAuxiliaryListener(port);
}
Set<InetSocketAddress> listenerAddrs =
server.getAuxiliaryListenerAddresses();
Set<InetSocketAddress> addrs = new HashSet<>();
for (InetSocketAddress addr : listenerAddrs) {
addrs.add(NetUtils.getConnectAddress(addr));
}
server.start();
Client client = new Client(LongWritable.class, conf);
Set<SerialCaller> calls = new HashSet<>();
for (InetSocketAddress addr : addrs) {
calls.add(new SerialCaller(client, addr, 100));
}
for (SerialCaller caller : calls) {
caller.join();
assertFalse(caller.failed);
}
client.stop();
server.stop();
}
@Test(timeout=60000)
public void testStandAloneClient() throws IOException {

View File

@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.primitives.SignedBytes;
import java.net.URISyntaxException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@ -92,6 +93,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
@InterfaceAudience.Private
@ -432,7 +435,7 @@ public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
String address = getConfValue(defaultValue, suffix, conf, keys);
String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
if (isa.isUnresolved()) {
@ -446,6 +449,86 @@ public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
return ret;
}
/**
* Return address from configuration. Take a list of keys as preference.
* If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY,
* will check to see if auxiliary ports are enabled. If so, call to replace
* address port with auxiliary port. If the address is not the value of
* DFS_NAMENODE_RPC_ADDRESS_KEY, return the original address. If failed to
* find any address, return the given default value.
*
* @param defaultValue the default value if no values found for given keys
* @param suffix suffix to append to keys
* @param conf the configuration
* @param keys a list of keys, ordered by preference
* @return
*/
private static String checkKeysAndProcess(String defaultValue, String suffix,
Configuration conf, String... keys) {
String succeededKey = null;
String address = null;
for (String key : keys) {
address = getConfValue(null, suffix, conf, key);
if (address != null) {
succeededKey = key;
break;
}
}
String ret;
if (address == null) {
ret = defaultValue;
} else if(DFS_NAMENODE_RPC_ADDRESS_KEY.equals(succeededKey)) {
ret = checkRpcAuxiliary(conf, suffix, address);
} else {
ret = address;
}
return ret;
}
/**
* Check if auxiliary port is enabled, if yes, check if the given address
* should have its port replaced by an auxiliary port. If the given address
* does not contain a port, append the auxiliary port to enforce using it.
*
* @param conf configuration.
* @param address the address to check and modify (if needed).
* @return the new modified address containing auxiliary port, or original
* address if auxiliary port not enabled.
*/
private static String checkRpcAuxiliary(Configuration conf, String suffix,
String address) {
String key = DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
key = addSuffix(key, suffix);
int[] ports = conf.getInts(key);
if (ports == null || ports.length == 0) {
return address;
}
LOG.info("Using server auxiliary ports " + Arrays.toString(ports));
URI uri;
try {
uri = new URI(address);
} catch (URISyntaxException e) {
// return the original address untouched if it is not a valid URI. This
// happens in unit test, as MiniDFSCluster sets the value to
// 127.0.0.1:0, without schema (i.e. "hdfs://"). While in practice, this
// should not be the case. So log a warning message here.
LOG.warn("NameNode address is not a valid uri:" + address);
return address;
}
// Ignore the port, only take the schema(e.g. hdfs) and host (e.g.
// localhost), then append port
// TODO : revisit if there is a better way
StringBuilder sb = new StringBuilder();
sb.append(uri.getScheme());
sb.append("://");
sb.append(uri.getHost());
sb.append(":");
// TODO : currently, only the very first auxiliary port is being used.
// But actually NN supports running multiple auxiliary
sb.append(ports[0]);
return sb.toString();
}
/**
* Given a list of keys in the order of preference, returns a value
* for the key in the given order from the configuration.

View File

@ -68,6 +68,11 @@ public interface HdfsClientConfigKeys {
String PREFIX = "dfs.client.";
String DFS_NAMESERVICES = "dfs.nameservices";
String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports";
String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY
+ "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX;
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;
String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871;

View File

@ -1267,6 +1267,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final Class<DFSNetworkTopology> DFS_NET_TOPOLOGY_IMPL_DEFAULT =
DFSNetworkTopology.class;
public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@ -1048,6 +1049,14 @@ public InetSocketAddress getNameNodeAddress() {
return rpcServer.getRpcAddress();
}
/**
* @return The auxiliary nameNode RPC addresses, or empty set if there
* is none.
*/
public Set<InetSocketAddress> getAuxiliaryNameNodeAddresses() {
return rpcServer.getAuxiliaryRpcAddresses();
}
/**
* @return NameNode RPC address in "host:port" string form
*/
@ -1055,6 +1064,27 @@ public String getNameNodeAddressHostPortString() {
return NetUtils.getHostPortString(getNameNodeAddress());
}
/**
* Return a host:port format string corresponds to an auxiliary
* port configured on NameNode. If there are multiple auxiliary ports,
* an arbitrary one is returned. If there is no auxiliary listener, returns
* null.
*
* @return a string of format host:port that points to an auxiliary NameNode
* address, or null if there is no such address.
*/
@VisibleForTesting
public String getNNAuxiliaryRpcAddress() {
Set<InetSocketAddress> auxiliaryAddrs = getAuxiliaryNameNodeAddresses();
if (auxiliaryAddrs.isEmpty()) {
return null;
}
// since set has no particular order, returning the first element of
// from the iterator is effectively arbitrary.
InetSocketAddress addr = auxiliaryAddrs.iterator().next();
return NetUtils.getHostPortString(addr);
}
/**
* @return NameNode service RPC address if configured, the
* NameNode RPC address otherwise

View File

@ -26,6 +26,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
import static org.apache.hadoop.util.Time.now;
@ -537,6 +538,13 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
if (lifelineRpcServer != null) {
lifelineRpcServer.setTracer(nn.tracer);
}
int[] auxiliaryPorts =
conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
for (int auxiliaryPort : auxiliaryPorts) {
this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
}
}
}
/** Allow access to the lifeline RPC server for testing */
@ -606,10 +614,16 @@ InetSocketAddress getServiceRpcAddress() {
return serviceRPCAddress;
}
InetSocketAddress getRpcAddress() {
@VisibleForTesting
public InetSocketAddress getRpcAddress() {
return clientRpcAddress;
}
@VisibleForTesting
public Set<InetSocketAddress> getAuxiliaryRpcAddresses() {
return clientRpcServer.getAuxiliaryListenerAddresses();
}
private static UserGroupInformation getRemoteUser() throws IOException {
return NameNode.getRemoteUser();
}

View File

@ -5220,4 +5220,15 @@
ensure that other waiters on the lock can get in.
</description>
</property>
<property>
<name>dfs.namenode.rpc-address.auxiliary-ports</name>
<value></value>
<description>
A comma separated list of auxiliary ports for the NameNode to listen on.
This allows exposing multiple NN addresses to clients.
Particularly, it is used to enforce different SASL levels on different ports.
Empty list indicates that auxiliary ports are disabled.
</description>
</property>
</configuration>

View File

@ -1362,6 +1362,21 @@ public URI getURI(int nnIndex) {
}
return uri;
}
URI getURIForAuxiliaryPort(int nnIndex) {
String hostPort =
getNN(nnIndex).nameNode.getNNAuxiliaryRpcAddress();
if (hostPort == null) {
throw new RuntimeException("No auxiliary port found");
}
URI uri = null;
try {
uri = new URI("hdfs://" + hostPort);
} catch (URISyntaxException e) {
NameNode.LOG.warn("unexpected URISyntaxException", e);
}
return uri;
}
public int getInstanceId() {
return instanceId;
@ -1973,6 +1988,14 @@ public int getNameNodePort() {
checkSingleNameNode();
return getNameNodePort(0);
}
/**
* Get the auxiliary port of NameNode, NameNode specified by index.
*/
public int getNameNodeAuxiliaryPort() {
checkSingleNameNode();
return getNameNodeAuxiliaryPort(0);
}
/**
* Gets the rpc port used by the NameNode at the given index, because the
@ -1982,6 +2005,22 @@ public int getNameNodePort(int nnIndex) {
return getNN(nnIndex).nameNode.getNameNodeAddress().getPort();
}
/**
* Gets the rpc port used by the NameNode at the given index, if the
* NameNode has multiple auxiliary ports configured, a arbitrary
* one is returned.
*/
public int getNameNodeAuxiliaryPort(int nnIndex) {
Set<InetSocketAddress> allAuxiliaryAddresses =
getNN(nnIndex).nameNode.getAuxiliaryNameNodeAddresses();
if (allAuxiliaryAddresses.isEmpty()) {
return -1;
} else {
InetSocketAddress addr = allAuxiliaryAddresses.iterator().next();
return addr.getPort();
}
}
/**
* @return the service rpc port used by the NameNode at the given index.
*/
@ -2538,6 +2577,12 @@ public DistributedFileSystem getFileSystem() throws IOException {
return getFileSystem(0);
}
public DistributedFileSystem getFileSystemFromAuxiliaryPort()
throws IOException {
checkSingleNameNode();
return getFileSystemFromAuxiliaryPort(0);
}
/**
* Get a client handle to the DFS cluster for the namenode at given index.
*/
@ -2546,6 +2591,12 @@ public DistributedFileSystem getFileSystem(int nnIndex) throws IOException {
getNN(nnIndex).conf));
}
public DistributedFileSystem getFileSystemFromAuxiliaryPort(int nnIndex)
throws IOException {
return (DistributedFileSystem) addFileSystem(FileSystem.get(
getURIForAuxiliaryPort(nnIndex), getNN(nnIndex).conf));
}
/**
* Get another FileSystem instance that is different from FileSystem.get(conf).
* This simulating different threads working on different FileSystem instances.

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.junit.Test;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test NN auxiliary port with HA.
*/
public class TestHAAuxiliaryPort {
@Test
public void testTest() throws Exception {
Configuration conf = new Configuration();
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
"9000,9001");
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn2",
"9000,9001");
conf.set(DFS_NAMESERVICES, "ha-nn-uri-0");
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ha-nn-uri-0", "nn1,nn2");
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0")
.addNN(new MiniDFSNNTopology.NNConf("nn1"))
.addNN(new MiniDFSNNTopology.NNConf("nn2")));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
cluster.transitionToActive(0);
cluster.waitActive();
NameNode nn0 = cluster.getNameNode(0);
NameNode nn1 = cluster.getNameNode(1);
// all the addresses below are valid nn0 addresses
NameNodeRpcServer rpcServer0 = (NameNodeRpcServer)nn0.getRpcServer();
InetSocketAddress server0RpcAddress = rpcServer0.getRpcAddress();
Set<InetSocketAddress> auxAddrServer0 =
rpcServer0.getAuxiliaryRpcAddresses();
assertEquals(2, auxAddrServer0.size());
// all the addresses below are valid nn1 addresses
NameNodeRpcServer rpcServer1 = (NameNodeRpcServer)nn1.getRpcServer();
InetSocketAddress server1RpcAddress = rpcServer1.getRpcAddress();
Set<InetSocketAddress> auxAddrServer1 =
rpcServer1.getAuxiliaryRpcAddresses();
assertEquals(2, auxAddrServer1.size());
// mkdir on nn0 uri 0
URI nn0URI = new URI("hdfs://localhost:" +
server0RpcAddress.getPort());
try (DFSClient client0 = new DFSClient(nn0URI, conf)){
client0.mkdirs("/test", null, true);
// should be available on other ports also
for (InetSocketAddress auxAddr : auxAddrServer0) {
nn0URI = new URI("hdfs://localhost:" + auxAddr.getPort());
try (DFSClient clientTmp = new DFSClient(nn0URI, conf)) {
assertTrue(clientTmp.exists("/test"));
}
}
}
// now perform a failover
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
// then try to read the file from the nn1
URI nn1URI = new URI("hdfs://localhost:" +
server1RpcAddress.getPort());
try (DFSClient client1 = new DFSClient(nn1URI, conf)) {
assertTrue(client1.exists("/test"));
// should be available on other ports also
for (InetSocketAddress auxAddr : auxAddrServer1) {
nn1URI = new URI("hdfs://localhost:" + auxAddr.getPort());
try (DFSClient clientTmp = new DFSClient(nn1URI, conf)) {
assertTrue(client1.exists("/test"));
}
}
}
}
}