HDFS-6648. Order of namenodes in ConfiguredFailoverProxyProvider is undefined. Contributed by Inigo Goiri

This commit is contained in:
Chris Douglas 2017-03-20 17:15:13 -07:00
parent 8e15e24059
commit b104f3a282
3 changed files with 15 additions and 4 deletions

View File

@ -366,7 +366,7 @@ static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(
static Map<String, InetSocketAddress> getAddressesForNameserviceId( static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue, String... keys) { Configuration conf, String nsId, String defaultValue, String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId); Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newHashMap(); Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) { for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId); String suffix = concatSuffixes(nsId, nnId);
String address = getConfValue(defaultValue, suffix, conf, keys); String address = getConfValue(defaultValue, suffix, conf, keys);

View File

@ -264,6 +264,8 @@ interface Failover {
String CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = String CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY =
PREFIX + "connection.retries.on.timeouts"; PREFIX + "connection.retries.on.timeouts";
int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0; int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
String RANDOM_ORDER = PREFIX + "random.order";
boolean RANDOM_ORDER_DEFAULT = false;
} }
/** dfs.client.write configuration properties */ /** dfs.client.write configuration properties */

View File

@ -23,6 +23,7 @@
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -43,9 +44,10 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** /**
* A FailoverProxyProvider implementation which allows one to configure two URIs * A FailoverProxyProvider implementation which allows one to configure
* to connect to during fail-over. The first configured address is tried first, * multiple URIs to connect to during fail-over. A random configured address is
* and on a fail-over event the other address is tried. * tried first, and on a fail-over event the other addresses are tried
* sequentially in a random order.
*/ */
public class ConfiguredFailoverProxyProvider<T> extends public class ConfiguredFailoverProxyProvider<T> extends
AbstractNNFailoverProxyProvider<T> { AbstractNNFailoverProxyProvider<T> {
@ -124,6 +126,13 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
for (InetSocketAddress address : addressesOfNns) { for (InetSocketAddress address : addressesOfNns) {
proxies.add(new AddressRpcProxyPair<T>(address)); proxies.add(new AddressRpcProxyPair<T>(address));
} }
// Randomize the list to prevent all clients pointing to the same one
boolean randomized = conf.getBoolean(
HdfsClientConfigKeys.Failover.RANDOM_ORDER,
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
if (randomized) {
Collections.shuffle(proxies);
}
// The client may have a delegation token set for the logical // The client may have a delegation token set for the logical
// URI of the cluster. Clone this token to apply to each of the // URI of the cluster. Clone this token to apply to each of the