HDFS-13475. RBF: Admin cannot enforce Router enter SafeMode. Contributed by Chao Sun.
This commit is contained in:
parent
937ef39b3f
commit
359ea4e181
@ -665,4 +665,11 @@ public class Router extends CompositeService {
|
||||
Collection<NamenodeHeartbeatService> getNamenodeHearbeatServices() {
|
||||
return this.namenodeHeartbeatServices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Router safe mode service
|
||||
*/
|
||||
RouterSafemodeService getSafemodeService() {
|
||||
return this.safemodeService;
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
@ -272,23 +273,37 @@ public class RouterAdminServer extends AbstractService
|
||||
@Override
|
||||
public EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request)
|
||||
throws IOException {
|
||||
this.router.updateRouterState(RouterServiceState.SAFEMODE);
|
||||
this.router.getRpcServer().setSafeMode(true);
|
||||
return EnterSafeModeResponse.newInstance(verifySafeMode(true));
|
||||
boolean success = false;
|
||||
RouterSafemodeService safeModeService = this.router.getSafemodeService();
|
||||
if (safeModeService != null) {
|
||||
this.router.updateRouterState(RouterServiceState.SAFEMODE);
|
||||
safeModeService.setManualSafeMode(true);
|
||||
success = verifySafeMode(true);
|
||||
}
|
||||
return EnterSafeModeResponse.newInstance(success);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request)
|
||||
throws IOException {
|
||||
this.router.updateRouterState(RouterServiceState.RUNNING);
|
||||
this.router.getRpcServer().setSafeMode(false);
|
||||
return LeaveSafeModeResponse.newInstance(verifySafeMode(false));
|
||||
boolean success = false;
|
||||
RouterSafemodeService safeModeService = this.router.getSafemodeService();
|
||||
if (safeModeService != null) {
|
||||
this.router.updateRouterState(RouterServiceState.RUNNING);
|
||||
safeModeService.setManualSafeMode(false);
|
||||
success = verifySafeMode(false);
|
||||
}
|
||||
return LeaveSafeModeResponse.newInstance(success);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetSafeModeResponse getSafeMode(GetSafeModeRequest request)
|
||||
throws IOException {
|
||||
boolean isInSafeMode = this.router.getRpcServer().isInSafeMode();
|
||||
boolean isInSafeMode = false;
|
||||
RouterSafemodeService safeModeService = this.router.getSafemodeService();
|
||||
if (safeModeService != null) {
|
||||
isInSafeMode = safeModeService.isInSafeMode();
|
||||
}
|
||||
return GetSafeModeResponse.newInstance(isInSafeMode);
|
||||
}
|
||||
|
||||
@ -298,7 +313,8 @@ public class RouterAdminServer extends AbstractService
|
||||
* @return
|
||||
*/
|
||||
private boolean verifySafeMode(boolean isInSafeMode) {
|
||||
boolean serverInSafeMode = this.router.getRpcServer().isInSafeMode();
|
||||
Preconditions.checkNotNull(this.router.getSafemodeService());
|
||||
boolean serverInSafeMode = this.router.getSafemodeService().isInSafeMode();
|
||||
RouterServiceState currentState = this.router.getRouterState();
|
||||
|
||||
return (isInSafeMode && currentState == RouterServiceState.SAFEMODE
|
||||
|
@ -193,9 +193,6 @@ public class RouterRpcServer extends AbstractService
|
||||
/** Interface to map global name space to HDFS subcluster name spaces. */
|
||||
private final FileSubclusterResolver subclusterResolver;
|
||||
|
||||
/** If we are in safe mode, fail requests as if a standby NN. */
|
||||
private volatile boolean safeMode;
|
||||
|
||||
/** Category of the operation that a thread is executing. */
|
||||
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
|
||||
|
||||
@ -456,7 +453,8 @@ public class RouterRpcServer extends AbstractService
|
||||
return;
|
||||
}
|
||||
|
||||
if (safeMode) {
|
||||
RouterSafemodeService safemodeService = router.getSafemodeService();
|
||||
if (safemodeService != null && safemodeService.isInSafeMode()) {
|
||||
// Throw standby exception, router is not available
|
||||
if (rpcMonitor != null) {
|
||||
rpcMonitor.routerFailureSafemode();
|
||||
@ -466,26 +464,6 @@ public class RouterRpcServer extends AbstractService
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In safe mode all RPC requests will fail and return a standby exception.
|
||||
* The client will try another Router, similar to the client retry logic for
|
||||
* HA.
|
||||
*
|
||||
* @param mode True if enabled, False if disabled.
|
||||
*/
|
||||
public void setSafeMode(boolean mode) {
|
||||
this.safeMode = mode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the Router is in safe mode and cannot serve RPC calls.
|
||||
*
|
||||
* @return If the Router is in safe mode.
|
||||
*/
|
||||
public boolean isInSafeMode() {
|
||||
return this.safeMode;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException {
|
||||
|
@ -42,6 +42,23 @@ public class RouterSafemodeService extends PeriodicService {
|
||||
/** Router to manage safe mode. */
|
||||
private final Router router;
|
||||
|
||||
/**
|
||||
* If we are in safe mode, fail requests as if a standby NN.
|
||||
* Router can enter safe mode in two different ways:
|
||||
* 1. upon start up: router enters this mode after service start, and will
|
||||
* exit after certain time threshold;
|
||||
* 2. via admin command: router enters this mode via admin command:
|
||||
* dfsrouteradmin -safemode enter
|
||||
* and exit after admin command:
|
||||
* dfsrouteradmin -safemode leave
|
||||
*/
|
||||
|
||||
/** Whether Router is in safe mode */
|
||||
private volatile boolean safeMode;
|
||||
|
||||
/** Whether the Router safe mode is set manually (i.e., via Router admin) */
|
||||
private volatile boolean isSafeModeSetManually;
|
||||
|
||||
/** Interval in ms to wait post startup before allowing RPC requests. */
|
||||
private long startupInterval;
|
||||
/** Interval in ms after which the State Store cache is too stale. */
|
||||
@ -63,14 +80,29 @@ public class RouterSafemodeService extends PeriodicService {
|
||||
this.router = router;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return whether the current Router is in safe mode.
|
||||
*/
|
||||
boolean isInSafeMode() {
|
||||
return this.safeMode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the flag to indicate that the safe mode for this Router is set manually
|
||||
* via the Router admin command.
|
||||
*/
|
||||
void setManualSafeMode(boolean mode) {
|
||||
this.safeMode = mode;
|
||||
this.isSafeModeSetManually = mode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enter safe mode.
|
||||
*/
|
||||
private void enter() {
|
||||
LOG.info("Entering safe mode");
|
||||
enterSafeModeTime = now();
|
||||
RouterRpcServer rpcServer = router.getRpcServer();
|
||||
rpcServer.setSafeMode(true);
|
||||
safeMode = true;
|
||||
router.updateRouterState(RouterServiceState.SAFEMODE);
|
||||
}
|
||||
|
||||
@ -87,8 +119,7 @@ public class RouterSafemodeService extends PeriodicService {
|
||||
} else {
|
||||
routerMetrics.setSafeModeTime(timeInSafemode);
|
||||
}
|
||||
RouterRpcServer rpcServer = router.getRpcServer();
|
||||
rpcServer.setSafeMode(false);
|
||||
safeMode = false;
|
||||
router.updateRouterState(RouterServiceState.RUNNING);
|
||||
}
|
||||
|
||||
@ -131,17 +162,16 @@ public class RouterSafemodeService extends PeriodicService {
|
||||
this.startupInterval - delta);
|
||||
return;
|
||||
}
|
||||
RouterRpcServer rpcServer = router.getRpcServer();
|
||||
StateStoreService stateStore = router.getStateStore();
|
||||
long cacheUpdateTime = stateStore.getCacheUpdateTime();
|
||||
boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval;
|
||||
|
||||
// Always update to indicate our cache was updated
|
||||
if (isCacheStale) {
|
||||
if (!rpcServer.isInSafeMode()) {
|
||||
if (!safeMode) {
|
||||
enter();
|
||||
}
|
||||
} else if (rpcServer.isInSafeMode()) {
|
||||
} else if (safeMode && !isSafeModeSetManually) {
|
||||
// Cache recently updated, leave safe mode
|
||||
leave();
|
||||
}
|
||||
|
@ -82,6 +82,7 @@ public class TestRouterAdminCLI {
|
||||
.stateStore()
|
||||
.admin()
|
||||
.rpc()
|
||||
.safemode()
|
||||
.build();
|
||||
cluster.addRouterOverrides(conf);
|
||||
|
||||
@ -501,13 +502,13 @@ public class TestRouterAdminCLI {
|
||||
public void testManageSafeMode() throws Exception {
|
||||
// ensure the Router become RUNNING state
|
||||
waitState(RouterServiceState.RUNNING);
|
||||
assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode());
|
||||
assertFalse(routerContext.getRouter().getSafemodeService().isInSafeMode());
|
||||
assertEquals(0, ToolRunner.run(admin,
|
||||
new String[] {"-safemode", "enter"}));
|
||||
// verify state
|
||||
assertEquals(RouterServiceState.SAFEMODE,
|
||||
routerContext.getRouter().getRouterState());
|
||||
assertTrue(routerContext.getRouter().getRpcServer().isInSafeMode());
|
||||
assertTrue(routerContext.getRouter().getSafemodeService().isInSafeMode());
|
||||
|
||||
System.setOut(new PrintStream(out));
|
||||
assertEquals(0, ToolRunner.run(admin,
|
||||
@ -519,7 +520,7 @@ public class TestRouterAdminCLI {
|
||||
// verify state
|
||||
assertEquals(RouterServiceState.RUNNING,
|
||||
routerContext.getRouter().getRouterState());
|
||||
assertFalse(routerContext.getRouter().getRpcServer().isInSafeMode());
|
||||
assertFalse(routerContext.getRouter().getSafemodeService().isInSafeMode());
|
||||
|
||||
out.reset();
|
||||
assertEquals(0, ToolRunner.run(admin,
|
||||
|
@ -28,14 +28,17 @@ import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
@ -60,12 +63,12 @@ public class TestRouterSafemode {
|
||||
// 2 sec startup standby
|
||||
conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
|
||||
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
|
||||
// 1 sec cache refresh
|
||||
// 200 ms cache refresh
|
||||
conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
|
||||
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
|
||||
// 2 sec post cache update before entering safemode (2 intervals)
|
||||
200, TimeUnit.MILLISECONDS);
|
||||
// 1 sec post cache update before entering safemode (2 intervals)
|
||||
conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
|
||||
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
|
||||
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
|
||||
|
||||
conf.set(RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
|
||||
conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
|
||||
@ -77,6 +80,7 @@ public class TestRouterSafemode {
|
||||
// RPC + State Store + Safe Mode only
|
||||
conf = new RouterConfigBuilder(conf)
|
||||
.rpc()
|
||||
.admin()
|
||||
.safemode()
|
||||
.stateStore()
|
||||
.metrics()
|
||||
@ -118,7 +122,7 @@ public class TestRouterSafemode {
|
||||
public void testRouterExitSafemode()
|
||||
throws InterruptedException, IllegalStateException, IOException {
|
||||
|
||||
assertTrue(router.getRpcServer().isInSafeMode());
|
||||
assertTrue(router.getSafemodeService().isInSafeMode());
|
||||
verifyRouter(RouterServiceState.SAFEMODE);
|
||||
|
||||
// Wait for initial time in milliseconds
|
||||
@ -129,7 +133,7 @@ public class TestRouterSafemode {
|
||||
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
|
||||
Thread.sleep(interval);
|
||||
|
||||
assertFalse(router.getRpcServer().isInSafeMode());
|
||||
assertFalse(router.getSafemodeService().isInSafeMode());
|
||||
verifyRouter(RouterServiceState.RUNNING);
|
||||
}
|
||||
|
||||
@ -138,7 +142,7 @@ public class TestRouterSafemode {
|
||||
throws IllegalStateException, IOException, InterruptedException {
|
||||
|
||||
// Verify starting state
|
||||
assertTrue(router.getRpcServer().isInSafeMode());
|
||||
assertTrue(router.getSafemodeService().isInSafeMode());
|
||||
verifyRouter(RouterServiceState.SAFEMODE);
|
||||
|
||||
// We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time
|
||||
@ -157,7 +161,7 @@ public class TestRouterSafemode {
|
||||
Thread.sleep(interval1);
|
||||
|
||||
// Running
|
||||
assertFalse(router.getRpcServer().isInSafeMode());
|
||||
assertFalse(router.getSafemodeService().isInSafeMode());
|
||||
verifyRouter(RouterServiceState.RUNNING);
|
||||
|
||||
// Disable cache
|
||||
@ -167,12 +171,12 @@ public class TestRouterSafemode {
|
||||
long interval2 =
|
||||
conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
|
||||
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) +
|
||||
conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
|
||||
2 * conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
|
||||
TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
|
||||
Thread.sleep(interval2);
|
||||
|
||||
// Safemode
|
||||
assertTrue(router.getRpcServer().isInSafeMode());
|
||||
assertTrue(router.getSafemodeService().isInSafeMode());
|
||||
verifyRouter(RouterServiceState.SAFEMODE);
|
||||
}
|
||||
|
||||
@ -180,7 +184,7 @@ public class TestRouterSafemode {
|
||||
public void testRouterRpcSafeMode()
|
||||
throws IllegalStateException, IOException {
|
||||
|
||||
assertTrue(router.getRpcServer().isInSafeMode());
|
||||
assertTrue(router.getSafemodeService().isInSafeMode());
|
||||
verifyRouter(RouterServiceState.SAFEMODE);
|
||||
|
||||
// If the Router is in Safe Mode, we should get a SafeModeException
|
||||
@ -194,6 +198,38 @@ public class TestRouterSafemode {
|
||||
assertTrue("We should have thrown a safe mode exception", exception);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouterManualSafeMode() throws Exception {
|
||||
InetSocketAddress adminAddr = router.getAdminServerAddress();
|
||||
conf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, adminAddr);
|
||||
RouterAdmin admin = new RouterAdmin(conf);
|
||||
|
||||
assertTrue(router.getSafemodeService().isInSafeMode());
|
||||
verifyRouter(RouterServiceState.SAFEMODE);
|
||||
|
||||
// Wait until the Router exit start up safe mode
|
||||
long interval = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
|
||||
TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + 300;
|
||||
Thread.sleep(interval);
|
||||
verifyRouter(RouterServiceState.RUNNING);
|
||||
|
||||
// Now enter safe mode via Router admin command - it should work
|
||||
assertEquals(0, ToolRunner.run(admin, new String[] {"-safemode", "enter"}));
|
||||
verifyRouter(RouterServiceState.SAFEMODE);
|
||||
|
||||
// Wait for update interval of the safe mode service, it should still in
|
||||
// safe mode.
|
||||
interval = 2 * conf.getTimeDuration(
|
||||
DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1),
|
||||
TimeUnit.MILLISECONDS);
|
||||
Thread.sleep(interval);
|
||||
verifyRouter(RouterServiceState.SAFEMODE);
|
||||
|
||||
// Exit safe mode via admin command
|
||||
assertEquals(0, ToolRunner.run(admin, new String[] {"-safemode", "leave"}));
|
||||
verifyRouter(RouterServiceState.RUNNING);
|
||||
}
|
||||
|
||||
private void verifyRouter(RouterServiceState status)
|
||||
throws IllegalStateException, IOException {
|
||||
assertEquals(status, router.getRouterState());
|
||||
|
Loading…
x
Reference in New Issue
Block a user