diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java
new file mode 100644
index 0000000000..fbafabcde4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A utility class used to combine two protocol proxies.
+ * See {@link #combine(Class, Object...)}.
+ */
+public final class ProxyCombiner {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ProxyCombiner.class);
+
+ private ProxyCombiner() { }
+
+ /**
+ * Combine two or more proxies which together comprise a single proxy
+ * interface. This can be used for a protocol interface which {@code extends}
+ * multiple other protocol interfaces. The returned proxy will implement
+ * all of the methods of the combined proxy interface, delegating calls
+ * to which proxy implements that method. If multiple proxies implement the
+ * same method, the first in the list will be used for delegation.
+ *
+ * This will check that every method on the combined interface is
+ * implemented by at least one of the supplied proxy objects.
+ *
+ * @param combinedProxyInterface The interface of the combined proxy.
+ * @param proxies The proxies which should be used as delegates.
+ * @param The type of the proxy that will be returned.
+ * @return The combined proxy.
+ */
+ @SuppressWarnings("unchecked")
+ public static T combine(Class combinedProxyInterface,
+ Object... proxies) {
+ methodLoop:
+ for (Method m : combinedProxyInterface.getMethods()) {
+ for (Object proxy : proxies) {
+ try {
+ proxy.getClass().getMethod(m.getName(), m.getParameterTypes());
+ continue methodLoop; // go to the next method
+ } catch (NoSuchMethodException nsme) {
+ // Continue to try the next proxy
+ }
+ }
+ throw new IllegalStateException("The proxies specified for "
+ + combinedProxyInterface + " do not cover method " + m);
+ }
+
+ InvocationHandler handler = new CombinedProxyInvocationHandler(proxies);
+ return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(),
+ new Class[] {combinedProxyInterface}, handler);
+ }
+
+ private static final class CombinedProxyInvocationHandler
+ implements RpcInvocationHandler {
+
+ private final Object[] proxies;
+
+ private CombinedProxyInvocationHandler(Object[] proxies) {
+ this.proxies = proxies;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ Exception lastException = null;
+ for (Object underlyingProxy : proxies) {
+ try {
+ return method.invoke(underlyingProxy, args);
+ } catch (IllegalAccessException|IllegalArgumentException e) {
+ lastException = e;
+ }
+ }
+ // This shouldn't happen since the method coverage was verified in build()
+ LOG.error("BUG: Method {} was unable to be found on any of the "
+ + "underlying proxies for {}", method, proxy.getClass());
+ throw new IllegalArgumentException("Method " + method + " not supported",
+ lastException);
+ }
+
+ /**
+ * Since this is incapable of returning multiple connection IDs, simply
+ * return the first one. In most cases, the connection ID should be the same
+ * for all proxies.
+ */
+ @Override
+ public ConnectionId getConnectionId() {
+ return RPC.getConnectionIdForProxy(proxies[0]);
+ }
+
+ @Override
+ public void close() throws IOException {
+ MultipleIOException.Builder exceptionBuilder =
+ new MultipleIOException.Builder();
+ for (Object proxy : proxies) {
+ if (proxy instanceof Closeable) {
+ try {
+ ((Closeable) proxy).close();
+ } catch (IOException ioe) {
+ exceptionBuilder.add(ioe);
+ }
+ }
+ }
+ if (!exceptionBuilder.isEmpty()) {
+ throw exceptionBuilder.build();
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java
index f92a74ff7c..9364780800 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
@@ -41,4 +42,12 @@ T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface,
T createProxy(Configuration conf, InetSocketAddress nnAddr, Class xface,
UserGroupInformation ugi, boolean withRetries) throws IOException;
+ /**
+ * Set the alignment context to be used when creating new proxies using
+ * this factory. Not all implementations will use this alignment context.
+ */
+ default void setAlignmentContext(AlignmentContext alignmentContext) {
+ // noop
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index 75405085f2..a623be26ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -120,7 +120,7 @@ public ObserverReadProxyProvider(
super(conf, uri, xface, factory);
this.failoverProxy = failoverProxy;
this.alignmentContext = new ClientGSIContext();
- ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext);
+ factory.setAlignmentContext(alignmentContext);
// Don't bother configuring the number of retries and such on the retry
// policy since it is mainly only used for determining whether or not an
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index bb555ef259..3063083db8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -40,13 +40,16 @@
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
+import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProxyCombiner;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
@@ -122,7 +125,7 @@ public static ProxyAndInfo createProxy(Configuration conf,
if (failoverProxyProvider == null) {
return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),
xface, UserGroupInformation.getCurrentUser(), true,
- fallbackToSimpleAuth);
+ fallbackToSimpleAuth, null);
} else {
return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface,
failoverProxyProvider);
@@ -145,7 +148,7 @@ public static ProxyAndInfo createProxy(Configuration conf,
public static ProxyAndInfo createNonHAProxy(
Configuration conf, InetSocketAddress nnAddr, Class xface,
UserGroupInformation ugi, boolean withRetries) throws IOException {
- return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
+ return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null, null);
}
/**
@@ -167,29 +170,39 @@ public static ProxyAndInfo createNonHAProxy(
public static ProxyAndInfo createNonHAProxy(
Configuration conf, InetSocketAddress nnAddr, Class xface,
UserGroupInformation ugi, boolean withRetries,
- AtomicBoolean fallbackToSimpleAuth) throws IOException {
+ AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
+ throws IOException {
Text dtService = SecurityUtil.buildTokenService(nnAddr);
T proxy;
if (xface == ClientProtocol.class) {
- proxy = (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
- nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth);
+ proxy = (T) NameNodeProxiesClient.createProxyWithAlignmentContext(
+ nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth,
+ alignmentContext);
} else if (xface == JournalProtocol.class) {
- proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
+ proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi,
+ alignmentContext);
} else if (xface == NamenodeProtocol.class) {
proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi,
- withRetries);
+ withRetries, alignmentContext);
} else if (xface == GetUserMappingsProtocol.class) {
- proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
+ proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi,
+ alignmentContext);
} else if (xface == RefreshUserMappingsProtocol.class) {
- proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
+ proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf,
+ ugi, alignmentContext);
} else if (xface == RefreshAuthorizationPolicyProtocol.class) {
proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
- conf, ugi);
+ conf, ugi, alignmentContext);
} else if (xface == RefreshCallQueueProtocol.class) {
- proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
+ proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi,
+ alignmentContext);
} else if (xface == InMemoryAliasMapProtocol.class) {
- proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi);
+ proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi,
+ alignmentContext);
+ } else if (xface == BalancerProtocols.class) {
+ proxy = (T) createNNProxyWithBalancerProtocol(nnAddr, conf, ugi,
+ withRetries, fallbackToSimpleAuth, alignmentContext);
} else {
String message = "Unsupported protocol found when creating the proxy " +
"connection to NameNode: " +
@@ -202,58 +215,63 @@ public static ProxyAndInfo createNonHAProxy(
}
private static InMemoryAliasMapProtocol createNNProxyWithInMemoryAliasMapProtocol(
- InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
- throws IOException {
- AliasMapProtocolPB proxy = (AliasMapProtocolPB) createNameNodeProxy(
- address, conf, ugi, AliasMapProtocolPB.class, 30000);
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+ AlignmentContext alignmentContext) throws IOException {
+ AliasMapProtocolPB proxy = createNameNodeProxy(
+ address, conf, ugi, AliasMapProtocolPB.class, 30000, alignmentContext);
return new InMemoryAliasMapProtocolClientSideTranslatorPB(proxy);
}
private static JournalProtocol createNNProxyWithJournalProtocol(
- InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
- throws IOException {
- JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address,
- conf, ugi, JournalProtocolPB.class, 30000);
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+ AlignmentContext alignmentContext) throws IOException {
+ JournalProtocolPB proxy = createNameNodeProxy(address,
+ conf, ugi, JournalProtocolPB.class, 30000, alignmentContext);
return new JournalProtocolTranslatorPB(proxy);
}
private static RefreshAuthorizationPolicyProtocol
createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address,
- Configuration conf, UserGroupInformation ugi) throws IOException {
- RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB)
- createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0);
+ Configuration conf, UserGroupInformation ugi,
+ AlignmentContext alignmentContext) throws IOException {
+ RefreshAuthorizationPolicyProtocolPB proxy = createNameNodeProxy(address,
+ conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0,
+ alignmentContext);
return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
}
private static RefreshUserMappingsProtocol
createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address,
- Configuration conf, UserGroupInformation ugi) throws IOException {
- RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB)
- createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class, 0);
+ Configuration conf, UserGroupInformation ugi,
+ AlignmentContext alignmentContext) throws IOException {
+ RefreshUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf,
+ ugi, RefreshUserMappingsProtocolPB.class, 0, alignmentContext);
return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
}
private static RefreshCallQueueProtocol
createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address,
- Configuration conf, UserGroupInformation ugi) throws IOException {
- RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)
- createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class, 0);
+ Configuration conf, UserGroupInformation ugi,
+ AlignmentContext alignmentContext) throws IOException {
+ RefreshCallQueueProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
+ RefreshCallQueueProtocolPB.class, 0, alignmentContext);
return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
}
private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(
- InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
- throws IOException {
- GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)
- createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class, 0);
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+ AlignmentContext alignmentContext) throws IOException {
+ GetUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
+ GetUserMappingsProtocolPB.class, 0, alignmentContext);
return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
}
private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
- boolean withRetries) throws IOException {
- NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
- address, conf, ugi, NamenodeProtocolPB.class, 0);
+ boolean withRetries, AlignmentContext alignmentContext)
+ throws IOException {
+ NamenodeProtocolPB proxy = createNameNodeProxy(
+ address, conf, ugi, NamenodeProtocolPB.class, 0, alignmentContext);
if (withRetries) { // create the proxy with retries
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
TimeUnit.MILLISECONDS);
@@ -270,13 +288,28 @@ private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
}
}
- private static Object createNameNodeProxy(InetSocketAddress address,
- Configuration conf, UserGroupInformation ugi, Class> xface,
- int rpcTimeout) throws IOException {
+ private static BalancerProtocols createNNProxyWithBalancerProtocol(
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+ boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
+ AlignmentContext alignmentContext) throws IOException {
+ NamenodeProtocol namenodeProtocol = createNNProxyWithNamenodeProtocol(
+ address, conf, ugi, withRetries, alignmentContext);
+ ClientProtocol clientProtocol =
+ NameNodeProxiesClient.createProxyWithAlignmentContext(address,
+ conf, ugi, withRetries, fallbackToSimpleAuth, alignmentContext);
+
+ return ProxyCombiner.combine(BalancerProtocols.class,
+ namenodeProtocol, clientProtocol);
+ }
+
+ private static T createNameNodeProxy(InetSocketAddress address,
+ Configuration conf, UserGroupInformation ugi, Class xface,
+ int rpcTimeout, AlignmentContext alignmentContext) throws IOException {
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
- Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
- ugi, conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
- return proxy;
+ return RPC.getProtocolProxy(xface,
+ RPC.getProtocolVersion(xface), address, ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf), rpcTimeout, null, null,
+ alignmentContext).getProxy();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 114167ca44..3be7530e76 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -43,11 +43,11 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -111,8 +111,7 @@ public static void checkOtherInstanceRunning(boolean toCheck) {
private final URI nameNodeUri;
private final String blockpoolID;
- private final NamenodeProtocol namenode;
- private final ClientProtocol client;
+ private final BalancerProtocols namenode;
private final KeyManager keyManager;
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
@@ -136,9 +135,7 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
this.maxNotChangedIterations = maxNotChangedIterations;
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
- NamenodeProtocol.class).getProxy();
- this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
- ClientProtocol.class, fallbackToSimpleAuth).getProxy();
+ BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
@@ -194,7 +191,7 @@ public boolean isUpgrading() throws IOException {
/** @return live datanode storage reports. */
public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException {
- return client.getDatanodeStorageReport(DatanodeReportType.LIVE);
+ return namenode.getDatanodeStorageReport(DatanodeReportType.LIVE);
}
/** @return the key manager */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java
index 036b6eb367..1aaaa388d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
@@ -27,12 +28,14 @@
public class NameNodeHAProxyFactory implements HAProxyFactory {
+ private AlignmentContext alignmentContext;
+
@Override
public T createProxy(Configuration conf, InetSocketAddress nnAddr,
Class xface, UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
- ugi, withRetries, fallbackToSimpleAuth).getProxy();
+ ugi, withRetries, fallbackToSimpleAuth, alignmentContext).getProxy();
}
@Override
@@ -42,4 +45,8 @@ public T createProxy(Configuration conf, InetSocketAddress nnAddr,
return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
ugi, withRetries).getProxy();
}
+
+ public void setAlignmentContext(AlignmentContext alignmentContext) {
+ this.alignmentContext = alignmentContext;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java
new file mode 100644
index 0000000000..d23f6cb5fd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+
+/** The full set of protocols used by the Balancer. */
+@InterfaceAudience.Private
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY)
+public interface BalancerProtocols extends ClientProtocol, NamenodeProtocol { }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index 14441931fe..4a398dbf92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -18,14 +18,13 @@
package org.apache.hadoop.hdfs.server.balancer;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import java.net.URI;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
@@ -33,7 +32,9 @@
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.junit.Test;
/**
@@ -43,6 +44,13 @@ public class TestBalancerWithHANameNodes {
private MiniDFSCluster cluster;
ClientProtocol client;
+ // array of racks for original nodes in cluster
+ private static final String[] TEST_RACKS =
+ {TestBalancer.RACK0, TestBalancer.RACK1};
+ // array of capacities for original nodes in cluster
+ private static final long[] TEST_CAPACITIES =
+ {TestBalancer.CAPACITY, TestBalancer.CAPACITY};
+
static {
TestBalancer.initTestSetup();
}
@@ -57,52 +65,79 @@ public class TestBalancerWithHANameNodes {
public void testBalancerWithHANameNodes() throws Exception {
Configuration conf = new HdfsConfiguration();
TestBalancer.initConf(conf);
- long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
- String newNodeRack = TestBalancer.RACK2; // new node's rack
- // array of racks for original nodes in cluster
- String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 };
- // array of capacities of original nodes in cluster
- long[] capacities = new long[] { TestBalancer.CAPACITY,
- TestBalancer.CAPACITY };
- assertEquals(capacities.length, racks.length);
- int numOfDatanodes = capacities.length;
+ assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
Configuration copiedConf = new Configuration(conf);
cluster = new MiniDFSCluster.Builder(copiedConf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
- .numDataNodes(capacities.length)
- .racks(racks)
- .simulatedCapacities(capacities)
+ .numDataNodes(TEST_CAPACITIES.length)
+ .racks(TEST_RACKS)
+ .simulatedCapacities(TEST_CAPACITIES)
.build();
HATestUtil.setFailoverConfigurations(cluster, conf);
try {
cluster.waitActive();
- cluster.transitionToActive(1);
+ cluster.transitionToActive(0);
Thread.sleep(500);
client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
ClientProtocol.class).getProxy();
- long totalCapacity = TestBalancer.sum(capacities);
- // fill up the cluster to be 30% full
- long totalUsedSpace = totalCapacity * 3 / 10;
- TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
- / numOfDatanodes, (short) numOfDatanodes, 1);
- // start up an empty node with the same capacity and on the same rack
- cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
- new long[] { newNodeCapacity });
- totalCapacity += newNodeCapacity;
- TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
- cluster);
- Collection namenodes = DFSUtil.getInternalNsRpcUris(conf);
- assertEquals(1, namenodes.size());
- assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
- final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
- assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
- TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
- cluster, BalancerParameters.DEFAULT);
+ doTest(conf);
} finally {
cluster.shutdown();
}
}
+
+ void doTest(Configuration conf) throws Exception {
+ int numOfDatanodes = TEST_CAPACITIES.length;
+ long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
+ // fill up the cluster to be 30% full
+ long totalUsedSpace = totalCapacity * 3 / 10;
+ TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
+ / numOfDatanodes, (short) numOfDatanodes, 0);
+
+ // start up an empty node with the same capacity and on the same rack
+ long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
+ String newNodeRack = TestBalancer.RACK2; // new node's rack
+ cluster.startDataNodes(conf, 1, true, null, new String[] {newNodeRack},
+ new long[] {newNodeCapacity});
+ totalCapacity += newNodeCapacity;
+ TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
+ cluster);
+ Collection namenodes = DFSUtil.getInternalNsRpcUris(conf);
+ assertEquals(1, namenodes.size());
+ final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+ TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
+ cluster, BalancerParameters.DEFAULT);
+ }
+
+ /**
+ * Test Balancer with ObserverNodes.
+ */
+ @Test(timeout = 60000)
+ public void testBalancerWithObserver() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ TestBalancer.initConf(conf);
+
+ MiniQJMHACluster qjmhaCluster = null;
+ try {
+ qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2,
+ TEST_CAPACITIES.length, true, TEST_CAPACITIES, TEST_RACKS);
+ cluster = qjmhaCluster.getDfsCluster();
+ cluster.waitClusterUp();
+ cluster.waitActive();
+
+ DistributedFileSystem dfs = HATestUtil.configureObserverReadFs(
+ cluster, conf, ObserverReadProxyProvider.class, true);
+ client = dfs.getClient().getNamenode();
+
+ doTest(conf);
+ } finally {
+ if (qjmhaCluster != null) {
+ qjmhaCluster.shutdown();
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index 76707d9962..f4843ac135 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -210,6 +210,14 @@ public static boolean isSentToAnyOfNameNodes(
public static MiniQJMHACluster setUpObserverCluster(
Configuration conf, int numObservers, int numDataNodes,
boolean fastTailing) throws IOException {
+ return setUpObserverCluster(conf, numObservers, numDataNodes,
+ fastTailing, null, null);
+ }
+
+ public static MiniQJMHACluster setUpObserverCluster(
+ Configuration conf, int numObservers, int numDataNodes,
+ boolean fastTailing, long[] simulatedCapacities,
+ String[] racks) throws IOException {
// disable block scanner
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
@@ -225,7 +233,9 @@ public static MiniQJMHACluster setUpObserverCluster(
MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf)
.setNumNameNodes(2 + numObservers);
- qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes);
+ qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes)
+ .simulatedCapacities(simulatedCapacities)
+ .racks(racks);
MiniQJMHACluster qjmhaCluster = qjmBuilder.build();
MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();