diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index 754fea47d9..dcae2db32d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -17,30 +17,30 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import java.io.Closeable; import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.URI; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.ClientGSIContext; -import org.apache.hadoop.hdfs.NameNodeProxiesClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation * that supports reading from observer namenode(s). @@ -55,16 +55,20 @@ * observer is turned off. */ public class ObserverReadProxyProvider - extends ConfiguredFailoverProxyProvider { + extends AbstractNNFailoverProxyProvider { private static final Logger LOG = LoggerFactory.getLogger( ObserverReadProxyProvider.class); /** Client-side context for syncing with the NameNode server side */ private AlignmentContext alignmentContext; + private AbstractNNFailoverProxyProvider failoverProxy; + /** All NameNdoe proxies */ + private List> nameNodeProxies = + new ArrayList>(); /** Proxies for the observer namenodes */ - private final List> observerProxies = - new ArrayList<>(); + private final List> observerProxies = + new ArrayList>(); /** * Whether reading from observer is enabled. If this is false, all read @@ -81,36 +85,43 @@ public class ObserverReadProxyProvider /** The last proxy that has been used. Only used for testing */ private volatile ProxyInfo lastProxy = null; - @SuppressWarnings("unchecked") + /** + * By default ObserverReadProxyProvider uses + * {@link ConfiguredFailoverProxyProvider} for failover. + */ public ObserverReadProxyProvider( Configuration conf, URI uri, Class xface, HAProxyFactory factory) throws IOException { + this(conf, uri, xface, factory, + new ConfiguredFailoverProxyProvider(conf, uri, xface,factory)); + } + + public ObserverReadProxyProvider( + Configuration conf, URI uri, Class xface, HAProxyFactory factory, + AbstractNNFailoverProxyProvider failoverProxy) + throws IOException { super(conf, uri, xface, factory); - alignmentContext = new ClientGSIContext(); + this.failoverProxy = failoverProxy; + this.alignmentContext = new ClientGSIContext(); ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext); + // Get all NameNode proxies + nameNodeProxies = getProxyAddresses(uri, + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); // Find out all the observer proxies - for (AddressRpcProxyPair ap : this.proxies) { - ap.namenode = (T) NameNodeProxiesClient.createProxyWithAlignmentContext( - ap.address, conf, ugi, false, getFallbackToSimpleAuth(), - alignmentContext); - if (isObserverState(ap)) { - observerProxies.add(ap); + for (NNProxyInfo pi : nameNodeProxies) { + createProxyIfNeeded(pi); + if (isObserverState(pi)) { + observerProxies.add(pi); } } + // TODO: No observers is not an error + // Just direct all reads go to the active NameNode if (observerProxies.isEmpty()) { throw new RuntimeException("Couldn't find any namenode proxy in " + "OBSERVER state"); } - - // Randomize the list to prevent all clients pointing to the same one - boolean randomized = conf.getBoolean( - HdfsClientConfigKeys.Failover.RANDOM_ORDER, - HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); - if (randomized) { - Collections.shuffle(observerProxies); - } } public synchronized AlignmentContext getAlignmentContext() { @@ -121,17 +132,13 @@ public synchronized AlignmentContext getAlignmentContext() { @Override public synchronized ProxyInfo getProxy() { // We just create a wrapped proxy containing all the proxies - List> observerProxies = new ArrayList<>(); StringBuilder combinedInfo = new StringBuilder("["); for (int i = 0; i < this.observerProxies.size(); i++) { if (i > 0) { combinedInfo.append(","); } - AddressRpcProxyPair p = this.observerProxies.get(i); - ProxyInfo pInfo = getProxy(p); - observerProxies.add(pInfo); - combinedInfo.append(pInfo.proxyInfo); + combinedInfo.append(observerProxies.get(i).proxyInfo); } combinedInfo.append(']'); @@ -142,6 +149,11 @@ public synchronized ProxyInfo getProxy() { return new ProxyInfo<>(wrappedProxy, combinedInfo.toString()); } + @Override + public void performFailover(T currentProxy) { + failoverProxy.performFailover(currentProxy); + } + /** * Check if a method is read-only. * @@ -170,14 +182,14 @@ ProxyInfo getLastProxy() { return lastProxy; } - boolean isObserverState(AddressRpcProxyPair ap) { + boolean isObserverState(NNProxyInfo pi) { // TODO: should introduce new ClientProtocol method to verify the // underlying service state, which does not require superuser access // The is a workaround IOException ioe = null; try { // Verify write access first - ap.namenode.reportBadBlocks(new LocatedBlock[0]); + pi.proxy.reportBadBlocks(new LocatedBlock[0]); return false; // Only active NameNode allows write } catch (RemoteException re) { IOException sbe = re.unwrapRemoteException(StandbyException.class); @@ -188,14 +200,14 @@ boolean isObserverState(AddressRpcProxyPair ap) { ioe = e; } if (ioe != null) { - LOG.error("Failed to connect to {}", ap.address, ioe); + LOG.error("Failed to connect to {}", pi.getAddress(), ioe); return false; } // Verify read access // For now we assume only Observer nodes allow reads // Stale reads on StandbyNode should be turned off try { - ap.namenode.checkAccess("/", FsAction.READ); + pi.proxy.checkAccess("/", FsAction.READ); return true; } catch (RemoteException re) { IOException sbe = re.unwrapRemoteException(StandbyException.class); @@ -206,19 +218,19 @@ boolean isObserverState(AddressRpcProxyPair ap) { ioe = e; } if (ioe != null) { - LOG.error("Failed to connect to {}", ap.address, ioe); + LOG.error("Failed to connect to {}", pi.getAddress(), ioe); } return false; } class ObserverReadInvocationHandler implements InvocationHandler { - final List> observerProxies; + final List> observerProxies; final ProxyInfo activeProxy; - ObserverReadInvocationHandler(List> observerProxies) { + ObserverReadInvocationHandler(List> observerProxies) { this.observerProxies = observerProxies; - this.activeProxy = ObserverReadProxyProvider.super.getProxy(); + this.activeProxy = failoverProxy.getProxy(); } /** @@ -238,7 +250,7 @@ public Object invoke(Object proxy, final Method method, final Object[] args) if (observerReadEnabled && isRead(method)) { // Loop through all the proxies, starting from the current index. for (int i = 0; i < observerProxies.size(); i++) { - ProxyInfo current = observerProxies.get(currentIndex.get()); + NNProxyInfo current = observerProxies.get(currentIndex.get()); try { retVal = method.invoke(current.proxy, args); lastProxy = current; @@ -269,4 +281,23 @@ public Object invoke(Object proxy, final Method method, final Object[] args) return retVal; } } + + @Override + public synchronized void close() throws IOException { + failoverProxy.close(); + for (ProxyInfo pi : nameNodeProxies) { + if (pi.proxy != null) { + if (pi.proxy instanceof Closeable) { + ((Closeable)pi.proxy).close(); + } else { + RPC.stopProxy(pi.proxy); + } + } + } + } + + @Override + public boolean useLogicalURI() { + return failoverProxy.useLogicalURI(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java new file mode 100644 index 0000000000..1dbd02cb38 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProviderWithIPFailover.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; + +/** + * ObserverReadProxyProvider with IPFailoverProxyProvider + * as the failover method. + */ +public class +ObserverReadProxyProviderWithIPFailover +extends ObserverReadProxyProvider { + + public ObserverReadProxyProviderWithIPFailover( + Configuration conf, URI uri, Class xface, + HAProxyFactory factory) throws IOException { + super(conf, uri, xface, factory, + new IPFailoverProxyProvider(conf, uri, xface,factory)); + } +} \ No newline at end of file