HDFS-13480. RBF: Separate namenodeHeartbeat and routerHeartbeat to different config key. Contributed by Ayush Saxena.
This commit is contained in:
parent
ddbe08db33
commit
6915d7e13c
@ -91,6 +91,8 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
||||
public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
|
||||
FEDERATION_ROUTER_PREFIX + "heartbeat.enable";
|
||||
public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true;
|
||||
public static final String DFS_ROUTER_NAMENODE_HEARTBEAT_ENABLE =
|
||||
FEDERATION_ROUTER_PREFIX + "namenode.heartbeat.enable";
|
||||
public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS =
|
||||
FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
|
||||
public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
|
||||
|
@ -205,9 +205,13 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
||||
addService(this.httpServer);
|
||||
}
|
||||
|
||||
if (conf.getBoolean(
|
||||
boolean isRouterHeartbeatEnabled = conf.getBoolean(
|
||||
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
|
||||
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
|
||||
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT);
|
||||
boolean isNamenodeHeartbeatEnable = conf.getBoolean(
|
||||
RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_ENABLE,
|
||||
isRouterHeartbeatEnabled);
|
||||
if (isNamenodeHeartbeatEnable) {
|
||||
|
||||
// Create status updater for each monitored Namenode
|
||||
this.namenodeHeartbeatServices = createNamenodeHeartbeatServices();
|
||||
@ -219,7 +223,8 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
||||
if (this.namenodeHeartbeatServices.isEmpty()) {
|
||||
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
|
||||
}
|
||||
|
||||
}
|
||||
if (isRouterHeartbeatEnabled) {
|
||||
// Periodically update the router state
|
||||
this.routerHeartbeatService = new RouterHeartbeatService(this);
|
||||
addService(this.routerHeartbeatService);
|
||||
@ -750,6 +755,14 @@ Collection<NamenodeHeartbeatService> getNamenodeHeartbeatServices() {
|
||||
return this.namenodeHeartbeatServices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get this router heartbeat service.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
RouterHeartbeatService getRouterHeartbeatService() {
|
||||
return this.routerHeartbeatService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Router safe mode service.
|
||||
*/
|
||||
|
@ -371,6 +371,16 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.namenode.heartbeat.enable</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
If true, get namenode heartbeats and send into the State Store.
|
||||
If not explicitly specified takes the same value as for
|
||||
dfs.federation.router.heartbeat.enable.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.store.router.expiration</name>
|
||||
<value>5m</value>
|
||||
|
@ -45,6 +45,7 @@ This approach has the same architecture as [YARN federation](../../hadoop-yarn/h
|
||||
|
||||
### Example flow
|
||||
The simplest configuration deploys a Router on each NameNode machine.
|
||||
The Router monitors the local NameNode and its state and heartbeats to the State Store.
|
||||
The Router monitors the local NameNode and heartbeats the state to the State Store.
|
||||
When a regular DFS client contacts any of the Routers to access a file in the federated filesystem, the Router checks the Mount Table in the State Store (i.e., the local cache) to find out which subcluster contains the file.
|
||||
Then it checks the Membership table in the State Store (i.e., the local cache) for the NameNode responsible for the subcluster.
|
||||
@ -69,6 +70,9 @@ To make sure that changes have been propagated to all Routers, each Router heart
|
||||
The communications between the Routers and the State Store are cached (with timed expiration for freshness).
|
||||
This improves the performance of the system.
|
||||
|
||||
#### Router heartbeat
|
||||
The Router periodically heartbeats its state to the State Store.
|
||||
|
||||
#### NameNode heartbeat
|
||||
For this role, the Router periodically checks the state of a NameNode (usually on the same server) and reports their high availability (HA) state and load/space status to the State Store.
|
||||
Note that this is an optional role, as a Router can be independent of any subcluster.
|
||||
@ -433,7 +437,8 @@ Monitor the namenodes in the subclusters for forwarding the client requests.
|
||||
|
||||
| Property | Default | Description|
|
||||
|:---- |:---- |:---- |
|
||||
| dfs.federation.router.heartbeat.enable | `true` | If `true`, the Router heartbeats into the State Store. |
|
||||
| dfs.federation.router.heartbeat.enable | `true` | If `true`, the Router periodically heartbeats its state to the State Store. |
|
||||
| dfs.federation.router.namenode.heartbeat.enable | | If `true`, the Router gets namenode heartbeats and send to the State Store. If not explicitly specified takes the same value as for `dfs.federation.router.heartbeat.enable`. |
|
||||
| dfs.federation.router.heartbeat.interval | 5000 | How often the Router should heartbeat into the State Store in milliseconds. |
|
||||
| dfs.federation.router.monitor.namenode | | The identifier of the namenodes to monitor and heartbeat. |
|
||||
| dfs.federation.router.monitor.localnamenode.enable | `true` | If `true`, the Router should monitor the namenode in the local machine. |
|
||||
|
@ -21,11 +21,13 @@
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
@ -217,4 +219,45 @@ public void testRouterMetricsWhenDisabled() throws Exception {
|
||||
router.stop();
|
||||
router.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwitchRouter() throws IOException {
|
||||
assertRouterHeartbeater(true, true);
|
||||
assertRouterHeartbeater(true, false);
|
||||
assertRouterHeartbeater(false, true);
|
||||
assertRouterHeartbeater(false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the test by specify the routerHeartbeat and nnHeartbeat switch.
|
||||
*
|
||||
* @param expectedRouterHeartbeat expect the routerHeartbeat enable state.
|
||||
* @param expectedNNHeartbeat expect the nnHeartbeat enable state.
|
||||
*/
|
||||
private void assertRouterHeartbeater(boolean expectedRouterHeartbeat,
|
||||
boolean expectedNNHeartbeat) throws IOException {
|
||||
final Router router = new Router();
|
||||
Configuration baseCfg = new RouterConfigBuilder(conf).rpc().build();
|
||||
baseCfg.setBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
|
||||
expectedRouterHeartbeat);
|
||||
baseCfg.setBoolean(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_ENABLE,
|
||||
expectedNNHeartbeat);
|
||||
router.init(baseCfg);
|
||||
RouterHeartbeatService routerHeartbeatService =
|
||||
router.getRouterHeartbeatService();
|
||||
if (expectedRouterHeartbeat) {
|
||||
assertNotNull(routerHeartbeatService);
|
||||
} else {
|
||||
assertNull(routerHeartbeatService);
|
||||
}
|
||||
Collection<NamenodeHeartbeatService> namenodeHeartbeatServices =
|
||||
router.getNamenodeHeartbeatServices();
|
||||
if (expectedNNHeartbeat) {
|
||||
assertNotNull(namenodeHeartbeatServices);
|
||||
} else {
|
||||
assertNull(namenodeHeartbeatServices);
|
||||
}
|
||||
router.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user