HDFS-13410. RBF: Support federation with no subclusters. Contributed by Inigo Goiri.
This commit is contained in:
parent
0006346abe
commit
a92200f4a6
@ -932,7 +932,9 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
|
||||
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
|
||||
final Method m = method.getMethod();
|
||||
|
||||
if (locations.size() == 1) {
|
||||
if (locations.isEmpty()) {
|
||||
throw new IOException("No remote locations available");
|
||||
} else if (locations.size() == 1) {
|
||||
// Shortcut, just one call
|
||||
T location = locations.iterator().next();
|
||||
String ns = location.getNameserviceId();
|
||||
|
@ -17,23 +17,25 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.server.federation.MockResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -77,27 +79,31 @@ public static void create() throws IOException {
|
||||
"0.0.0.0");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destroy() {
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException, URISyntaxException {
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() {
|
||||
}
|
||||
|
||||
private static void testRouterStartup(Configuration routerConfig)
|
||||
throws InterruptedException, IOException {
|
||||
Router router = new Router();
|
||||
assertEquals(STATE.NOTINITED, router.getServiceState());
|
||||
assertEquals(RouterServiceState.UNINITIALIZED, router.getRouterState());
|
||||
router.init(routerConfig);
|
||||
if (routerConfig.getBoolean(
|
||||
RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
|
||||
RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) {
|
||||
assertEquals(RouterServiceState.SAFEMODE, router.getRouterState());
|
||||
} else {
|
||||
assertEquals(RouterServiceState.INITIALIZING, router.getRouterState());
|
||||
}
|
||||
assertEquals(STATE.INITED, router.getServiceState());
|
||||
router.start();
|
||||
if (routerConfig.getBoolean(
|
||||
RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
|
||||
RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) {
|
||||
assertEquals(RouterServiceState.SAFEMODE, router.getRouterState());
|
||||
} else {
|
||||
assertEquals(RouterServiceState.RUNNING, router.getRouterState());
|
||||
}
|
||||
assertEquals(STATE.STARTED, router.getServiceState());
|
||||
router.stop();
|
||||
assertEquals(RouterServiceState.SHUTDOWN, router.getRouterState());
|
||||
assertEquals(STATE.STOPPED, router.getServiceState());
|
||||
router.close();
|
||||
}
|
||||
@ -114,6 +120,9 @@ public void testRouterService() throws InterruptedException, IOException {
|
||||
// Rpc only
|
||||
testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
|
||||
|
||||
// Safemode only
|
||||
testRouterStartup(new RouterConfigBuilder(conf).rpc().safemode().build());
|
||||
|
||||
// Metrics only
|
||||
testRouterStartup(new RouterConfigBuilder(conf).metrics().build());
|
||||
|
||||
@ -147,4 +156,33 @@ public void testRouterRestartRpcService() throws IOException {
|
||||
|
||||
router.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouterRpcWithNoSubclusters() throws IOException {
|
||||
|
||||
Router router = new Router();
|
||||
router.init(new RouterConfigBuilder(conf).rpc().build());
|
||||
router.start();
|
||||
|
||||
InetSocketAddress serverAddress = router.getRpcServerAddress();
|
||||
DFSClient dfsClient = new DFSClient(serverAddress, conf);
|
||||
|
||||
try {
|
||||
dfsClient.create("/test.txt", false);
|
||||
fail("Create with no subclusters should fail");
|
||||
} catch (RemoteException e) {
|
||||
assertExceptionContains("Cannot find locations for /test.txt", e);
|
||||
}
|
||||
|
||||
try {
|
||||
dfsClient.datanodeReport(DatanodeReportType.LIVE);
|
||||
fail("Get datanode reports with no subclusters should fail");
|
||||
} catch (IOException e) {
|
||||
assertExceptionContains("No remote locations available", e);
|
||||
}
|
||||
|
||||
dfsClient.close();
|
||||
router.stop();
|
||||
router.close();
|
||||
}
|
||||
}
|
||||
|
@ -79,6 +79,7 @@
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.MockResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
@ -1155,7 +1156,25 @@ public Boolean get() {
|
||||
}, 500, 5 * 1000);
|
||||
|
||||
// The cache should be updated now
|
||||
assertNotEquals(jsonString0, metrics.getLiveNodes());
|
||||
final String jsonString2 = metrics.getLiveNodes();
|
||||
assertNotEquals(jsonString0, jsonString2);
|
||||
|
||||
|
||||
// Without any subcluster available, we should return an empty list
|
||||
MockResolver resolver =
|
||||
(MockResolver) router.getRouter().getNamenodeResolver();
|
||||
resolver.cleanRegistrations();
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return !jsonString2.equals(metrics.getLiveNodes());
|
||||
}
|
||||
}, 500, 5 * 1000);
|
||||
assertEquals("{}", metrics.getLiveNodes());
|
||||
|
||||
// Reset the registrations again
|
||||
cluster.registerNamenodes();
|
||||
cluster.waitNamenodeRegistration();
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user