HDFS-13044. RBF: Add a safe mode for the Router. Contributed by Inigo Goiri.

This commit is contained in:
Yiqun Lin 2018-01-30 12:12:08 +08:00
parent fde95d463c
commit dbb9dded33
10 changed files with 515 additions and 12 deletions

View File

@ -1291,6 +1291,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
// HDFS Router safe mode
public static final String DFS_ROUTER_SAFEMODE_ENABLE =
FEDERATION_ROUTER_PREFIX + "safemode.enable";
public static final boolean DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT = true;
public static final String DFS_ROUTER_SAFEMODE_EXTENSION =
FEDERATION_ROUTER_PREFIX + "safemode.extension";
public static final long DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT =
TimeUnit.SECONDS.toMillis(30);
public static final String DFS_ROUTER_SAFEMODE_EXPIRATION =
FEDERATION_ROUTER_PREFIX + "safemode.expiration";
public static final long DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT =
3 * DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT;
// HDFS Router-based federation mount table entries
/** Maximum number of cache entries to have. */
public static final String FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE =

View File

@ -118,6 +118,8 @@ public class Router extends CompositeService {
private RouterStore routerStateManager;
/** Heartbeat our run status to the router state manager. */
private RouterHeartbeatService routerHeartbeatService;
/** Enter/exit safemode. */
private RouterSafemodeService safemodeService;
/** The start time of the namesystem. */
private final long startTime = Time.now();
@ -232,13 +234,25 @@ protected void serviceInit(Configuration configuration) throws Exception {
addService(this.quotaUpdateService);
}
// Safemode service to refuse RPC calls when the router is out of sync
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) {
// Create safemode monitoring service
this.safemodeService = new RouterSafemodeService(this);
addService(this.safemodeService);
}
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
updateRouterState(RouterServiceState.RUNNING);
if (this.safemodeService == null) {
// Router is running now
updateRouterState(RouterServiceState.RUNNING);
}
if (this.pauseMonitor != null) {
this.pauseMonitor.start();

View File

@ -179,6 +179,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
/** Interface to map global name space to HDFS subcluster name spaces. */
private final FileSubclusterResolver subclusterResolver;
/** If we are in safe mode, fail requests as if a standby NN. */
private volatile boolean safeMode;
/** Category of the operation that a thread is executing. */
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
@ -370,12 +372,12 @@ public InetSocketAddress getRpcAddress() {
* @param op Category of the operation to check.
* @param supported If the operation is supported or not. If not, it will
* throw an UnsupportedOperationException.
* @throws StandbyException If the Router is in safe mode and cannot serve
* client requests.
* @throws SafeModeException If the Router is in safe mode and cannot serve
* client requests.
* @throws UnsupportedOperationException If the operation is not supported.
*/
protected void checkOperation(OperationCategory op, boolean supported)
throws StandbyException, UnsupportedOperationException {
throws RouterSafeModeException, UnsupportedOperationException {
checkOperation(op);
if (!supported) {
@ -393,10 +395,11 @@ protected void checkOperation(OperationCategory op, boolean supported)
* UNCHECKED. This function should be called by all ClientProtocol functions.
*
* @param op Category of the operation to check.
* @throws StandbyException If the Router is in safe mode and cannot serve
* client requests.
* @throws SafeModeException If the Router is in safe mode and cannot serve
* client requests.
*/
protected void checkOperation(OperationCategory op) throws StandbyException {
protected void checkOperation(OperationCategory op)
throws RouterSafeModeException {
// Log the function we are currently calling.
if (rpcMonitor != null) {
rpcMonitor.startOp();
@ -415,7 +418,33 @@ protected void checkOperation(OperationCategory op) throws StandbyException {
return;
}
// TODO check Router safe mode and return Standby exception
if (safeMode) {
// Throw standby exception, router is not available
if (rpcMonitor != null) {
rpcMonitor.routerFailureSafemode();
}
throw new RouterSafeModeException(router.getRouterId(), op);
}
}
/**
* In safe mode all RPC requests will fail and return a standby exception.
* The client will try another Router, similar to the client retry logic for
* HA.
*
* @param mode True if enabled, False if disabled.
*/
public void setSafeMode(boolean mode) {
this.safeMode = mode;
}
/**
* Check if the Router is in safe mode and cannot serve RPC calls.
*
* @return If the Router is in safe mode.
*/
public boolean isInSafeMode() {
return this.safeMode;
}
@Override // ClientProtocol

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.ipc.StandbyException;
/**
* Exception that the Router throws when it is in safe mode. This extends
* {@link StandbyException} for the client to try another Router when it gets
* this exception.
*/
public class RouterSafeModeException extends StandbyException {
private static final long serialVersionUID = 453568188334993493L;
/** Identifier of the Router that generated this exception. */
private final String routerId;
/**
* Build a new Router safe mode exception.
* @param router Identifier of the Router.
* @param op Category of the operation (READ/WRITE).
*/
public RouterSafeModeException(String router, OperationCategory op) {
super("Router " + router + " is in safe mode and cannot handle " + op
+ " requests.");
this.routerId = router;
}
/**
* Get the id of the Router that generated this exception.
* @return Id of the Router that generated this exception.
*/
public String getRouterId() {
return this.routerId;
}
}

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.util.Time.now;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Service to periodically check if the {@link org.apache.hadoop.hdfs.server.
* federation.store.StateStoreService StateStoreService} cached information in
* the {@link Router} is up to date. This is for performance and removes the
* {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService
* StateStoreService} from the critical path in common operations.
*/
public class RouterSafemodeService extends PeriodicService {
private static final Logger LOG =
LoggerFactory.getLogger(RouterSafemodeService.class);
/** Router to manage safe mode. */
private final Router router;
/** Interval in ms to wait post startup before allowing RPC requests. */
private long startupInterval;
/** Interval in ms after which the State Store cache is too stale. */
private long staleInterval;
/** Start time in ms of this service. */
private long startupTime;
/** The time the Router enters safe mode in milliseconds. */
private long enterSafeModeTime = now();
/**
* Create a new Cache update service.
*
* @param router Router containing the cache.
*/
public RouterSafemodeService(Router router) {
super(RouterSafemodeService.class.getSimpleName());
this.router = router;
}
/**
* Enter safe mode.
*/
private void enter() {
LOG.info("Entering safe mode");
enterSafeModeTime = now();
RouterRpcServer rpcServer = router.getRpcServer();
rpcServer.setSafeMode(true);
router.updateRouterState(RouterServiceState.SAFEMODE);
}
/**
* Leave safe mode.
*/
private void leave() {
// Cache recently updated, leave safemode
long timeInSafemode = now() - enterSafeModeTime;
LOG.info("Leaving safe mode after {} milliseconds", timeInSafemode);
RouterMetrics routerMetrics = router.getRouterMetrics();
if (routerMetrics == null) {
LOG.error("The Router metrics are not enabled");
} else {
routerMetrics.setSafeModeTime(timeInSafemode);
}
RouterRpcServer rpcServer = router.getRpcServer();
rpcServer.setSafeMode(false);
router.updateRouterState(RouterServiceState.RUNNING);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// Use same interval as cache update service
this.setIntervalMs(conf.getTimeDuration(
DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT,
TimeUnit.MILLISECONDS));
this.startupInterval = conf.getTimeDuration(
DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION,
DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT,
TimeUnit.MILLISECONDS);
LOG.info("Leave startup safe mode after {} ms", this.startupInterval);
this.staleInterval = conf.getTimeDuration(
DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION,
DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT,
TimeUnit.MILLISECONDS);
LOG.info("Enter safe mode after {} ms without reaching the State Store",
this.staleInterval);
this.startupTime = Time.now();
// Initializing the RPC server in safe mode, it will disable it later
enter();
super.serviceInit(conf);
}
@Override
public void periodicInvoke() {
long now = Time.now();
long delta = now - startupTime;
if (delta < startupInterval) {
LOG.info("Delaying safemode exit for {} milliseconds...",
this.startupInterval - delta);
return;
}
RouterRpcServer rpcServer = router.getRpcServer();
StateStoreService stateStore = router.getStateStore();
long cacheUpdateTime = stateStore.getCacheUpdateTime();
boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval;
// Always update to indicate our cache was updated
if (isCacheStale) {
if (!rpcServer.isInSafeMode()) {
enter();
}
} else if (rpcServer.isInSafeMode()) {
// Cache recently updated, leave safe mode
leave();
}
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.store;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
@ -52,9 +54,10 @@ public StateStoreCacheUpdateService(StateStoreService stateStore) {
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.setIntervalMs(conf.getLong(
this.setIntervalMs(conf.getTimeDuration(
DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT));
DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT,
TimeUnit.MILLISECONDS));
super.serviceInit(conf);
}

View File

@ -5080,9 +5080,12 @@
<property>
<name>dfs.federation.router.cache.ttl</name>
<value>60000</value>
<value>1m</value>
<description>
How often to refresh the State Store caches in milliseconds.
How often to refresh the State Store caches in milliseconds. This setting
supports multiple time unit suffixes as described in
dfs.heartbeat.interval. If no suffix is specified then milliseconds is
assumed.
</description>
</property>
@ -5130,6 +5133,35 @@
</description>
</property>
<property>
<name>dfs.federation.router.safemode.enable</name>
<value>true</value>
<description>
</description>
</property>
<property>
<name>dfs.federation.router.safemode.extension</name>
<value>30s</value>
<description>
Time after startup that the Router is in safe mode. This setting
supports multiple time unit suffixes as described in
dfs.heartbeat.interval. If no suffix is specified then milliseconds is
assumed.
</description>
</property>
<property>
<name>dfs.federation.router.safemode.expiration</name>
<value>3m</value>
<description>
Time without being able to reach the State Store to enter safe mode. This
setting supports multiple time unit suffixes as described in
dfs.heartbeat.interval. If no suffix is specified then milliseconds is
assumed.
</description>
</property>
<property>
<name>dfs.federation.router.monitor.namenode</name>
<value></value>

View File

@ -81,6 +81,10 @@ The Routers are stateless and metadata operations are atomic at the NameNodes.
If a Router becomes unavailable, any Router can take over for it.
The clients configure their DFS HA client (e.g., ConfiguredFailoverProvider or RequestHedgingProxyProvider) with all the Routers in the federation as endpoints.
* **Unavailable State Store:**
If a Router cannot contact the State Store, it will enter into a Safe Mode state which disallows it from serving requests.
Clients will treat Routers in Safe Mode as it was an Standby NameNode and try another Router.
* **NameNode heartbeat HA:**
For high availability and flexibility, multiple Routers can monitor the same NameNode and heartbeat the information to the State Store.
This increases clients' resiliency to stale information, should a Router fail.

View File

@ -35,6 +35,7 @@ public class RouterConfigBuilder {
private boolean enableStateStore = false;
private boolean enableMetrics = false;
private boolean enableQuota = false;
private boolean enableSafemode = false;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@ -52,6 +53,7 @@ public RouterConfigBuilder all() {
this.enableLocalHeartbeat = true;
this.enableStateStore = true;
this.enableMetrics = true;
this.enableSafemode = true;
return this;
}
@ -95,6 +97,11 @@ public RouterConfigBuilder quota(boolean enable) {
return this;
}
public RouterConfigBuilder safemode(boolean enable) {
this.enableSafemode = enable;
return this;
}
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
@ -123,6 +130,10 @@ public RouterConfigBuilder quota() {
return this.quota(true);
}
public RouterConfigBuilder safemode() {
return this.safemode(true);
}
public Configuration build() {
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
this.enableStateStore);
@ -139,6 +150,8 @@ public Configuration build() {
this.enableMetrics);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
this.enableQuota);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
this.enableSafemode);
return conf;
}
}

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.deleteStateStore;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test the safe mode for the {@link Router} controlled by
* {@link RouterSafemodeService}.
*/
public class TestRouterSafemode {
private Router router;
private static Configuration conf;
@BeforeClass
public static void create() throws IOException {
// Wipe state store
deleteStateStore();
// Configuration that supports the state store
conf = getStateStoreConfiguration();
// 2 sec startup standby
conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
// 1 sec cache refresh
conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
// 2 sec post cache update before entering safemode (2 intervals)
conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
// RPC + State Store + Safe Mode only
conf = new RouterConfigBuilder(conf)
.rpc()
.safemode()
.stateStore()
.metrics()
.build();
}
@AfterClass
public static void destroy() {
}
@Before
public void setup() throws IOException, URISyntaxException {
router = new Router();
router.init(conf);
router.start();
}
@After
public void cleanup() throws IOException {
if (router != null) {
router.stop();
router = null;
}
}
@Test
public void testSafemodeService() throws IOException {
RouterSafemodeService server = new RouterSafemodeService(router);
server.init(conf);
assertEquals(STATE.INITED, server.getServiceState());
server.start();
assertEquals(STATE.STARTED, server.getServiceState());
server.stop();
assertEquals(STATE.STOPPED, server.getServiceState());
server.close();
}
@Test
public void testRouterExitSafemode()
throws InterruptedException, IllegalStateException, IOException {
assertTrue(router.getRpcServer().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
// Wait for initial time in milliseconds
long interval =
conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) +
conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
Thread.sleep(interval);
assertFalse(router.getRpcServer().isInSafeMode());
verifyRouter(RouterServiceState.RUNNING);
}
@Test
public void testRouterEnterSafemode()
throws IllegalStateException, IOException, InterruptedException {
// Verify starting state
assertTrue(router.getRpcServer().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
// We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time
long interval0 = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) - 1000;
long t0 = Time.now();
while (Time.now() - t0 < interval0) {
verifyRouter(RouterServiceState.SAFEMODE);
Thread.sleep(100);
}
// We wait some time for the state to propagate
long interval1 = 1000 + 2 * conf.getTimeDuration(
DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1),
TimeUnit.MILLISECONDS);
Thread.sleep(interval1);
// Running
assertFalse(router.getRpcServer().isInSafeMode());
verifyRouter(RouterServiceState.RUNNING);
// Disable cache
router.getStateStore().stopCacheUpdateService();
// Wait until the State Store cache is stale in milliseconds
long interval2 =
conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) +
conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
Thread.sleep(interval2);
// Safemode
assertTrue(router.getRpcServer().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
}
@Test
public void testRouterRpcSafeMode()
throws IllegalStateException, IOException {
assertTrue(router.getRpcServer().isInSafeMode());
verifyRouter(RouterServiceState.SAFEMODE);
// If the Router is in Safe Mode, we should get a SafeModeException
boolean exception = false;
try {
router.getRpcServer().delete("/testfile.txt", true);
fail("We should have thrown a safe mode exception");
} catch (RouterSafeModeException sme) {
exception = true;
}
assertTrue("We should have thrown a safe mode exception", exception);
}
private void verifyRouter(RouterServiceState status)
throws IllegalStateException, IOException {
assertEquals(status, router.getRouterState());
}
}