diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java index fbafabcde4..99eb487be4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.ipc; +import com.google.common.base.Joiner; import java.io.Closeable; import java.io.IOException; import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @@ -74,7 +76,8 @@ public static T combine(Class combinedProxyInterface, + combinedProxyInterface + " do not cover method " + m); } - InvocationHandler handler = new CombinedProxyInvocationHandler(proxies); + InvocationHandler handler = + new CombinedProxyInvocationHandler(combinedProxyInterface, proxies); return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(), new Class[] {combinedProxyInterface}, handler); } @@ -82,9 +85,12 @@ public static T combine(Class combinedProxyInterface, private static final class CombinedProxyInvocationHandler implements RpcInvocationHandler { + private final Class proxyInterface; private final Object[] proxies; - private CombinedProxyInvocationHandler(Object[] proxies) { + private CombinedProxyInvocationHandler(Class proxyInterface, + Object[] proxies) { + this.proxyInterface = proxyInterface; this.proxies = proxies; } @@ -97,6 +103,8 @@ public Object invoke(Object proxy, Method method, Object[] args) return method.invoke(underlyingProxy, args); } catch (IllegalAccessException|IllegalArgumentException e) { lastException = e; + } catch (InvocationTargetException ite) { + throw ite.getCause(); } } // This shouldn't happen since the method coverage was verified in build() @@ -116,6 +124,12 @@ public ConnectionId getConnectionId() { return RPC.getConnectionIdForProxy(proxies[0]); } + @Override + public String toString() { + return "CombinedProxy[" + proxyInterface.getSimpleName() + "][" + + Joiner.on(",").join(proxies) + "]"; + } + @Override public void close() throws IOException { MultipleIOException.Builder exceptionBuilder = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java index 43cc75175e..f1fab273bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java @@ -62,6 +62,9 @@ private void setupCluster(Configuration conf) throws Exception { MiniDFSNNTopology.NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT); Configuration copiedConf = new Configuration(conf); + // Limit the number of failover retries to avoid the test taking too long + conf.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 2); + conf.setInt(HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, 0); cluster = new MiniDFSCluster.Builder(copiedConf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(TEST_CAPACITIES.length).racks(TEST_RACKS) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index 3e67f7fb20..c604315fb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -125,10 +125,25 @@ void doTest(Configuration conf) throws Exception { /** * Test Balancer with ObserverNodes. */ - @Test(timeout = 60000) + @Test(timeout = 120000) public void testBalancerWithObserver() throws Exception { + testBalancerWithObserver(false); + } + + /** + * Test Balancer with ObserverNodes when one has failed. + */ + @Test(timeout = 180000) + public void testBalancerWithObserverWithFailedNode() throws Exception { + testBalancerWithObserver(true); + } + + private void testBalancerWithObserver(boolean withObserverFailure) + throws Exception { final Configuration conf = new HdfsConfiguration(); TestBalancer.initConf(conf); + // Avoid the same FS being reused between tests + conf.setBoolean("fs.hdfs.impl.disable.cache", true); MiniQJMHACluster qjmhaCluster = null; try { @@ -142,6 +157,10 @@ public void testBalancerWithObserver() throws Exception { namesystemSpies.add( NameNodeAdapter.spyOnNamesystem(cluster.getNameNode(i))); } + if (withObserverFailure) { + // First observer NN is at index 2 + cluster.shutdownNameNode(2); + } DistributedFileSystem dfs = HATestUtil.configureObserverReadFs( cluster, conf, ObserverReadProxyProvider.class, true); @@ -149,9 +168,10 @@ public void testBalancerWithObserver() throws Exception { doTest(conf); for (int i = 0; i < cluster.getNumNameNodes(); i++) { - // First observer node is at idx 2 so it should get both getBlocks calls - // all other NameNodes should see 0 getBlocks calls - int expectedCount = (i == 2) ? 2 : 0; + // First observer node is at idx 2, or 3 if 2 has been shut down + // It should get both getBlocks calls, all other NNs should see 0 calls + int expectedObserverIdx = withObserverFailure ? 3 : 2; + int expectedCount = (i == expectedObserverIdx) ? 2 : 0; verify(namesystemSpies.get(i), times(expectedCount)) .getBlocks(any(), anyLong(), anyLong()); }