HDFS-13782. ObserverReadProxyProvider should work with IPFailoverProxyProvider. Contributed by Konstantin Shvachko.

This commit is contained in:
Konstantin V Shvachko 2018-08-25 17:32:30 -07:00
parent 25d8e39b5c
commit f9fc01cd7f
2 changed files with 108 additions and 37 deletions

View File

@ -17,30 +17,30 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/** /**
* A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation
* that supports reading from observer namenode(s). * that supports reading from observer namenode(s).
@ -55,16 +55,20 @@
* observer is turned off. * observer is turned off.
*/ */
public class ObserverReadProxyProvider<T extends ClientProtocol> public class ObserverReadProxyProvider<T extends ClientProtocol>
extends ConfiguredFailoverProxyProvider<T> { extends AbstractNNFailoverProxyProvider<T> {
private static final Logger LOG = LoggerFactory.getLogger( private static final Logger LOG = LoggerFactory.getLogger(
ObserverReadProxyProvider.class); ObserverReadProxyProvider.class);
/** Client-side context for syncing with the NameNode server side */ /** Client-side context for syncing with the NameNode server side */
private AlignmentContext alignmentContext; private AlignmentContext alignmentContext;
private AbstractNNFailoverProxyProvider<T> failoverProxy;
/** All NameNdoe proxies */
private List<NNProxyInfo<T>> nameNodeProxies =
new ArrayList<NNProxyInfo<T>>();
/** Proxies for the observer namenodes */ /** Proxies for the observer namenodes */
private final List<AddressRpcProxyPair<T>> observerProxies = private final List<NNProxyInfo<T>> observerProxies =
new ArrayList<>(); new ArrayList<NNProxyInfo<T>>();
/** /**
* Whether reading from observer is enabled. If this is false, all read * Whether reading from observer is enabled. If this is false, all read
@ -81,36 +85,43 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
/** The last proxy that has been used. Only used for testing */ /** The last proxy that has been used. Only used for testing */
private volatile ProxyInfo<T> lastProxy = null; private volatile ProxyInfo<T> lastProxy = null;
@SuppressWarnings("unchecked") /**
* By default ObserverReadProxyProvider uses
* {@link ConfiguredFailoverProxyProvider} for failover.
*/
public ObserverReadProxyProvider( public ObserverReadProxyProvider(
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory)
throws IOException { throws IOException {
this(conf, uri, xface, factory,
new ConfiguredFailoverProxyProvider<T>(conf, uri, xface,factory));
}
public ObserverReadProxyProvider(
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
AbstractNNFailoverProxyProvider<T> failoverProxy)
throws IOException {
super(conf, uri, xface, factory); super(conf, uri, xface, factory);
alignmentContext = new ClientGSIContext(); this.failoverProxy = failoverProxy;
this.alignmentContext = new ClientGSIContext();
((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext); ((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
// Get all NameNode proxies
nameNodeProxies = getProxyAddresses(uri,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
// Find out all the observer proxies // Find out all the observer proxies
for (AddressRpcProxyPair<T> ap : this.proxies) { for (NNProxyInfo<T> pi : nameNodeProxies) {
ap.namenode = (T) NameNodeProxiesClient.createProxyWithAlignmentContext( createProxyIfNeeded(pi);
ap.address, conf, ugi, false, getFallbackToSimpleAuth(), if (isObserverState(pi)) {
alignmentContext); observerProxies.add(pi);
if (isObserverState(ap)) {
observerProxies.add(ap);
} }
} }
// TODO: No observers is not an error
// Just direct all reads go to the active NameNode
if (observerProxies.isEmpty()) { if (observerProxies.isEmpty()) {
throw new RuntimeException("Couldn't find any namenode proxy in " + throw new RuntimeException("Couldn't find any namenode proxy in " +
"OBSERVER state"); "OBSERVER state");
} }
// Randomize the list to prevent all clients pointing to the same one
boolean randomized = conf.getBoolean(
HdfsClientConfigKeys.Failover.RANDOM_ORDER,
HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
if (randomized) {
Collections.shuffle(observerProxies);
}
} }
public synchronized AlignmentContext getAlignmentContext() { public synchronized AlignmentContext getAlignmentContext() {
@ -121,17 +132,13 @@ public synchronized AlignmentContext getAlignmentContext() {
@Override @Override
public synchronized ProxyInfo<T> getProxy() { public synchronized ProxyInfo<T> getProxy() {
// We just create a wrapped proxy containing all the proxies // We just create a wrapped proxy containing all the proxies
List<ProxyInfo<T>> observerProxies = new ArrayList<>();
StringBuilder combinedInfo = new StringBuilder("["); StringBuilder combinedInfo = new StringBuilder("[");
for (int i = 0; i < this.observerProxies.size(); i++) { for (int i = 0; i < this.observerProxies.size(); i++) {
if (i > 0) { if (i > 0) {
combinedInfo.append(","); combinedInfo.append(",");
} }
AddressRpcProxyPair<T> p = this.observerProxies.get(i); combinedInfo.append(observerProxies.get(i).proxyInfo);
ProxyInfo<T> pInfo = getProxy(p);
observerProxies.add(pInfo);
combinedInfo.append(pInfo.proxyInfo);
} }
combinedInfo.append(']'); combinedInfo.append(']');
@ -142,6 +149,11 @@ public synchronized ProxyInfo<T> getProxy() {
return new ProxyInfo<>(wrappedProxy, combinedInfo.toString()); return new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
} }
@Override
public void performFailover(T currentProxy) {
failoverProxy.performFailover(currentProxy);
}
/** /**
* Check if a method is read-only. * Check if a method is read-only.
* *
@ -170,14 +182,14 @@ ProxyInfo<T> getLastProxy() {
return lastProxy; return lastProxy;
} }
boolean isObserverState(AddressRpcProxyPair<T> ap) { boolean isObserverState(NNProxyInfo<T> pi) {
// TODO: should introduce new ClientProtocol method to verify the // TODO: should introduce new ClientProtocol method to verify the
// underlying service state, which does not require superuser access // underlying service state, which does not require superuser access
// The is a workaround // The is a workaround
IOException ioe = null; IOException ioe = null;
try { try {
// Verify write access first // Verify write access first
ap.namenode.reportBadBlocks(new LocatedBlock[0]); pi.proxy.reportBadBlocks(new LocatedBlock[0]);
return false; // Only active NameNode allows write return false; // Only active NameNode allows write
} catch (RemoteException re) { } catch (RemoteException re) {
IOException sbe = re.unwrapRemoteException(StandbyException.class); IOException sbe = re.unwrapRemoteException(StandbyException.class);
@ -188,14 +200,14 @@ boolean isObserverState(AddressRpcProxyPair<T> ap) {
ioe = e; ioe = e;
} }
if (ioe != null) { if (ioe != null) {
LOG.error("Failed to connect to {}", ap.address, ioe); LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
return false; return false;
} }
// Verify read access // Verify read access
// For now we assume only Observer nodes allow reads // For now we assume only Observer nodes allow reads
// Stale reads on StandbyNode should be turned off // Stale reads on StandbyNode should be turned off
try { try {
ap.namenode.checkAccess("/", FsAction.READ); pi.proxy.checkAccess("/", FsAction.READ);
return true; return true;
} catch (RemoteException re) { } catch (RemoteException re) {
IOException sbe = re.unwrapRemoteException(StandbyException.class); IOException sbe = re.unwrapRemoteException(StandbyException.class);
@ -206,19 +218,19 @@ boolean isObserverState(AddressRpcProxyPair<T> ap) {
ioe = e; ioe = e;
} }
if (ioe != null) { if (ioe != null) {
LOG.error("Failed to connect to {}", ap.address, ioe); LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
} }
return false; return false;
} }
class ObserverReadInvocationHandler implements InvocationHandler { class ObserverReadInvocationHandler implements InvocationHandler {
final List<ProxyInfo<T>> observerProxies; final List<NNProxyInfo<T>> observerProxies;
final ProxyInfo<T> activeProxy; final ProxyInfo<T> activeProxy;
ObserverReadInvocationHandler(List<ProxyInfo<T>> observerProxies) { ObserverReadInvocationHandler(List<NNProxyInfo<T>> observerProxies) {
this.observerProxies = observerProxies; this.observerProxies = observerProxies;
this.activeProxy = ObserverReadProxyProvider.super.getProxy(); this.activeProxy = failoverProxy.getProxy();
} }
/** /**
@ -238,7 +250,7 @@ public Object invoke(Object proxy, final Method method, final Object[] args)
if (observerReadEnabled && isRead(method)) { if (observerReadEnabled && isRead(method)) {
// Loop through all the proxies, starting from the current index. // Loop through all the proxies, starting from the current index.
for (int i = 0; i < observerProxies.size(); i++) { for (int i = 0; i < observerProxies.size(); i++) {
ProxyInfo<T> current = observerProxies.get(currentIndex.get()); NNProxyInfo<T> current = observerProxies.get(currentIndex.get());
try { try {
retVal = method.invoke(current.proxy, args); retVal = method.invoke(current.proxy, args);
lastProxy = current; lastProxy = current;
@ -269,4 +281,23 @@ public Object invoke(Object proxy, final Method method, final Object[] args)
return retVal; return retVal;
} }
} }
@Override
public synchronized void close() throws IOException {
failoverProxy.close();
for (ProxyInfo<T> pi : nameNodeProxies) {
if (pi.proxy != null) {
if (pi.proxy instanceof Closeable) {
((Closeable)pi.proxy).close();
} else {
RPC.stopProxy(pi.proxy);
}
}
}
}
@Override
public boolean useLogicalURI() {
return failoverProxy.useLogicalURI();
}
} }

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
/**
* ObserverReadProxyProvider with IPFailoverProxyProvider
* as the failover method.
*/
public class
ObserverReadProxyProviderWithIPFailover<T extends ClientProtocol>
extends ObserverReadProxyProvider<T> {
public ObserverReadProxyProviderWithIPFailover(
Configuration conf, URI uri, Class<T> xface,
HAProxyFactory<T> factory) throws IOException {
super(conf, uri, xface, factory,
new IPFailoverProxyProvider<T>(conf, uri, xface,factory));
}
}