HDFS-15032. Properly handle InvocationTargetExceptions in the proxy created by ProxyCombiner. This fixes a bug encountered by the HDFS balancer when used with Observer Nodes. Contributed by Erik Krogen.
This commit is contained in:
parent
e573ea4908
commit
c174d50b30
@ -17,9 +17,11 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.InvocationHandler;
|
import java.lang.reflect.InvocationHandler;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.lang.reflect.Proxy;
|
import java.lang.reflect.Proxy;
|
||||||
|
|
||||||
@ -74,7 +76,8 @@ public final class ProxyCombiner {
|
|||||||
+ combinedProxyInterface + " do not cover method " + m);
|
+ combinedProxyInterface + " do not cover method " + m);
|
||||||
}
|
}
|
||||||
|
|
||||||
InvocationHandler handler = new CombinedProxyInvocationHandler(proxies);
|
InvocationHandler handler =
|
||||||
|
new CombinedProxyInvocationHandler(combinedProxyInterface, proxies);
|
||||||
return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(),
|
return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(),
|
||||||
new Class[] {combinedProxyInterface}, handler);
|
new Class[] {combinedProxyInterface}, handler);
|
||||||
}
|
}
|
||||||
@ -82,9 +85,12 @@ public final class ProxyCombiner {
|
|||||||
private static final class CombinedProxyInvocationHandler
|
private static final class CombinedProxyInvocationHandler
|
||||||
implements RpcInvocationHandler {
|
implements RpcInvocationHandler {
|
||||||
|
|
||||||
|
private final Class<?> proxyInterface;
|
||||||
private final Object[] proxies;
|
private final Object[] proxies;
|
||||||
|
|
||||||
private CombinedProxyInvocationHandler(Object[] proxies) {
|
private CombinedProxyInvocationHandler(Class<?> proxyInterface,
|
||||||
|
Object[] proxies) {
|
||||||
|
this.proxyInterface = proxyInterface;
|
||||||
this.proxies = proxies;
|
this.proxies = proxies;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,6 +103,8 @@ public final class ProxyCombiner {
|
|||||||
return method.invoke(underlyingProxy, args);
|
return method.invoke(underlyingProxy, args);
|
||||||
} catch (IllegalAccessException|IllegalArgumentException e) {
|
} catch (IllegalAccessException|IllegalArgumentException e) {
|
||||||
lastException = e;
|
lastException = e;
|
||||||
|
} catch (InvocationTargetException ite) {
|
||||||
|
throw ite.getCause();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// This shouldn't happen since the method coverage was verified in build()
|
// This shouldn't happen since the method coverage was verified in build()
|
||||||
@ -116,6 +124,12 @@ public final class ProxyCombiner {
|
|||||||
return RPC.getConnectionIdForProxy(proxies[0]);
|
return RPC.getConnectionIdForProxy(proxies[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "CombinedProxy[" + proxyInterface.getSimpleName() + "]["
|
||||||
|
+ Joiner.on(",").join(proxies) + "]";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
MultipleIOException.Builder exceptionBuilder =
|
MultipleIOException.Builder exceptionBuilder =
|
||||||
|
@ -62,6 +62,9 @@ public class TestBalancerService {
|
|||||||
MiniDFSNNTopology.NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
|
MiniDFSNNTopology.NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
|
||||||
nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
|
nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
|
||||||
Configuration copiedConf = new Configuration(conf);
|
Configuration copiedConf = new Configuration(conf);
|
||||||
|
// Limit the number of failover retries to avoid the test taking too long
|
||||||
|
conf.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 2);
|
||||||
|
conf.setInt(HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, 0);
|
||||||
cluster = new MiniDFSCluster.Builder(copiedConf)
|
cluster = new MiniDFSCluster.Builder(copiedConf)
|
||||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
.numDataNodes(TEST_CAPACITIES.length).racks(TEST_RACKS)
|
.numDataNodes(TEST_CAPACITIES.length).racks(TEST_RACKS)
|
||||||
|
@ -125,10 +125,25 @@ public class TestBalancerWithHANameNodes {
|
|||||||
/**
|
/**
|
||||||
* Test Balancer with ObserverNodes.
|
* Test Balancer with ObserverNodes.
|
||||||
*/
|
*/
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 120000)
|
||||||
public void testBalancerWithObserver() throws Exception {
|
public void testBalancerWithObserver() throws Exception {
|
||||||
|
testBalancerWithObserver(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Balancer with ObserverNodes when one has failed.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 180000)
|
||||||
|
public void testBalancerWithObserverWithFailedNode() throws Exception {
|
||||||
|
testBalancerWithObserver(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testBalancerWithObserver(boolean withObserverFailure)
|
||||||
|
throws Exception {
|
||||||
final Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
TestBalancer.initConf(conf);
|
TestBalancer.initConf(conf);
|
||||||
|
// Avoid the same FS being reused between tests
|
||||||
|
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||||
|
|
||||||
MiniQJMHACluster qjmhaCluster = null;
|
MiniQJMHACluster qjmhaCluster = null;
|
||||||
try {
|
try {
|
||||||
@ -142,6 +157,10 @@ public class TestBalancerWithHANameNodes {
|
|||||||
namesystemSpies.add(
|
namesystemSpies.add(
|
||||||
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode(i)));
|
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode(i)));
|
||||||
}
|
}
|
||||||
|
if (withObserverFailure) {
|
||||||
|
// First observer NN is at index 2
|
||||||
|
cluster.shutdownNameNode(2);
|
||||||
|
}
|
||||||
|
|
||||||
DistributedFileSystem dfs = HATestUtil.configureObserverReadFs(
|
DistributedFileSystem dfs = HATestUtil.configureObserverReadFs(
|
||||||
cluster, conf, ObserverReadProxyProvider.class, true);
|
cluster, conf, ObserverReadProxyProvider.class, true);
|
||||||
@ -149,9 +168,10 @@ public class TestBalancerWithHANameNodes {
|
|||||||
|
|
||||||
doTest(conf);
|
doTest(conf);
|
||||||
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
|
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
|
||||||
// First observer node is at idx 2 so it should get both getBlocks calls
|
// First observer node is at idx 2, or 3 if 2 has been shut down
|
||||||
// all other NameNodes should see 0 getBlocks calls
|
// It should get both getBlocks calls, all other NNs should see 0 calls
|
||||||
int expectedCount = (i == 2) ? 2 : 0;
|
int expectedObserverIdx = withObserverFailure ? 3 : 2;
|
||||||
|
int expectedCount = (i == expectedObserverIdx) ? 2 : 0;
|
||||||
verify(namesystemSpies.get(i), times(expectedCount))
|
verify(namesystemSpies.get(i), times(expectedCount))
|
||||||
.getBlocks(any(), anyLong(), anyLong());
|
.getBlocks(any(), anyLong(), anyLong());
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user