HDFS-16539. RBF: Support refreshing/changing router fairness policy controller without rebooting router (#4168)

This commit is contained in:
Felix Nguyen 2022-04-27 14:42:30 +08:00 committed by GitHub
parent f187e9bcd5
commit b4ff49a394
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 348 additions and 23 deletions

View File

@ -36,7 +36,7 @@
public class AbstractRouterRpcFairnessPolicyController
implements RouterRpcFairnessPolicyController {
private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);
/** Hash table to hold semaphore for each configured name service. */
@ -64,6 +64,7 @@ public void releasePermit(String nsId) {
@Override
public void shutdown() {
LOG.debug("Shutting down router fairness policy controller");
// drain all semaphores
for (Semaphore sema: this.permits.values()) {
sema.drainPermits();

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.ipc.RefreshHandler;
import org.apache.hadoop.ipc.RefreshResponse;
public class RefreshFairnessPolicyControllerHandler implements RefreshHandler {
final static public String HANDLER_IDENTIFIER = "RefreshFairnessPolicyController";
private final Router router;
public RefreshFairnessPolicyControllerHandler(Router router) {
this.router = router;
}
@Override
public RefreshResponse handleRefresh(String identifier, String[] args) {
if (HANDLER_IDENTIFIER.equals(identifier)) {
return new RefreshResponse(0, router.getRpcServer().refreshFairnessPolicyController());
}
return new RefreshResponse(-1, "Failed");
}
}

View File

@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -45,6 +46,7 @@
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RouterPolicyProvider;
import org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
@ -211,6 +213,8 @@ public RouterAdminServer(Configuration conf, Router router)
genericRefreshService, adminServer);
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, adminServer);
registerRefreshFairnessPolicyControllerHandler();
}
/**
@ -784,4 +788,9 @@ public void refreshCallQueue() throws IOException {
Configuration configuration = new Configuration();
router.getRpcServer().getServer().refreshCallQueue(configuration);
}
private void registerRefreshFairnessPolicyControllerHandler() {
RefreshRegistry.defaultRegistry()
.register(HANDLER_IDENTIFIER, new RefreshFairnessPolicyControllerHandler(router));
}
}

View File

@ -132,7 +132,7 @@ public class RouterRpcClient {
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
/** Fairness manager to control handlers assigned per NS. */
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
private volatile RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
private Map<String, LongAdder> acceptedPermitsPerNs = new ConcurrentHashMap<>();
@ -451,7 +451,8 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
* @throws StandbyException If all Namenodes are in Standby.
* @throws IOException If it cannot invoke the method.
*/
private Object invokeMethod(
@VisibleForTesting
public Object invokeMethod(
final UserGroupInformation ugi,
final List<? extends FederationNamenodeContext> namenodes,
final Class<?> protocol, final Method method, final Object... params)
@ -828,7 +829,8 @@ public Object invokeSingleBlockPool(final String bpId, RemoteMethod method)
public Object invokeSingle(final String nsId, RemoteMethod method)
throws IOException {
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
acquirePermit(nsId, ugi, method);
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(nsId, ugi, method, controller);
try {
List<? extends FederationNamenodeContext> nns =
getNamenodesForNameservice(nsId);
@ -838,7 +840,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method)
Object[] params = method.getParams(loc);
return invokeMethod(ugi, nns, proto, m, params);
} finally {
releasePermit(nsId, ugi, method);
releasePermit(nsId, ugi, method, controller);
}
}
@ -989,6 +991,7 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
Class<T> expectedResultClass, Object expectedResultValue)
throws IOException {
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
final Method m = remoteMethod.getMethod();
List<IOException> thrownExceptions = new ArrayList<>();
@ -996,7 +999,7 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
// Invoke in priority order
for (final RemoteLocationContext loc : locations) {
String ns = loc.getNameserviceId();
acquirePermit(ns, ugi, remoteMethod);
acquirePermit(ns, ugi, remoteMethod, controller);
List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
@ -1031,7 +1034,7 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
"Unexpected exception proxying API " + e.getMessage(), e);
thrownExceptions.add(ioe);
} finally {
releasePermit(ns, ugi, remoteMethod);
releasePermit(ns, ugi, remoteMethod, controller);
}
}
@ -1356,7 +1359,8 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
// Shortcut, just one call
T location = locations.iterator().next();
String ns = location.getNameserviceId();
acquirePermit(ns, ugi, method);
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(ns, ugi, method, controller);
final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
@ -1369,7 +1373,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
// Localize the exception
throw processException(ioe, location);
} finally {
releasePermit(ns, ugi, method);
releasePermit(ns, ugi, method, controller);
}
}
@ -1419,7 +1423,8 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
this.router.getRouterClientMetrics().incInvokedConcurrent(m);
}
acquirePermit(CONCURRENT_NS, ugi, method);
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(CONCURRENT_NS, ugi, method, controller);
try {
List<Future<Object>> futures = null;
if (timeOutMs > 0) {
@ -1477,7 +1482,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
throw new IOException(
"Unexpected error while invoking API " + ex.getMessage(), ex);
} finally {
releasePermit(CONCURRENT_NS, ugi, method);
releasePermit(CONCURRENT_NS, ugi, method, controller);
}
}
@ -1558,13 +1563,14 @@ private String getNameserviceForBlockPoolId(final String bpId)
* @param nsId Identifier of the block pool.
* @param ugi UserGroupIdentifier associated with the user.
* @param m Remote method that needs to be invoked.
* @param controller fairness policy controller to acquire permit from
* @throws IOException If permit could not be acquired for the nsId.
*/
private void acquirePermit(
final String nsId, final UserGroupInformation ugi, final RemoteMethod m)
private void acquirePermit(final String nsId, final UserGroupInformation ugi,
final RemoteMethod m, RouterRpcFairnessPolicyController controller)
throws IOException {
if (routerRpcFairnessPolicyController != null) {
if (!routerRpcFairnessPolicyController.acquirePermit(nsId)) {
if (controller != null) {
if (!controller.acquirePermit(nsId)) {
// Throw StandByException,
// Clients could fail over and try another router.
if (rpcMonitor != null) {
@ -1585,21 +1591,20 @@ private void acquirePermit(
/**
* Release permit for specific nsId after processing against downstream
* nsId is completed.
*
* @param nsId Identifier of the block pool.
* @param ugi UserGroupIdentifier associated with the user.
* @param m Remote method that needs to be invoked.
* @param controller fairness policy controller to release permit from
*/
private void releasePermit(
final String nsId, final UserGroupInformation ugi, final RemoteMethod m) {
if (routerRpcFairnessPolicyController != null) {
routerRpcFairnessPolicyController.releasePermit(nsId);
private void releasePermit(final String nsId, final UserGroupInformation ugi,
final RemoteMethod m, RouterRpcFairnessPolicyController controller) {
if (controller != null) {
controller.releasePermit(nsId);
LOG.trace("Permit released for ugi: {} for method: {}", ugi,
m.getMethodName());
}
}
@VisibleForTesting
public RouterRpcFairnessPolicyController
getRouterRpcFairnessPolicyController() {
return routerRpcFairnessPolicyController;
@ -1622,4 +1627,35 @@ public Long getAcceptedPermitForNs(String ns) {
return acceptedPermitsPerNs.containsKey(ns) ?
acceptedPermitsPerNs.get(ns).longValue() : 0L;
}
/**
* Refreshes/changes the fairness policy controller implementation if possible
* and returns the controller class name
* @param conf Configuration
* @return New controller class name if successfully refreshed, else old controller class name
*/
public synchronized String refreshFairnessPolicyController(Configuration conf) {
RouterRpcFairnessPolicyController newController;
try {
newController = FederationUtil.newFairnessPolicyController(conf);
} catch (RuntimeException e) {
LOG.error("Failed to create router fairness policy controller", e);
return getCurrentFairnessPolicyControllerClassName();
}
if (newController != null) {
if (routerRpcFairnessPolicyController != null) {
routerRpcFairnessPolicyController.shutdown();
}
routerRpcFairnessPolicyController = newController;
}
return getCurrentFairnessPolicyControllerClassName();
}
private String getCurrentFairnessPolicyControllerClassName() {
if (routerRpcFairnessPolicyController != null) {
return routerRpcFairnessPolicyController.getClass().getCanonicalName();
}
return null;
}
}

View File

@ -537,7 +537,7 @@ public FileSubclusterResolver getSubclusterResolver() {
}
/**
* Get the active namenode resolver
* Get the active namenode resolver.
*
* @return Active namenode resolver.
*/
@ -1990,6 +1990,10 @@ public int getSchedulerJobCount() {
return fedRenameScheduler.getAllJobs().size();
}
public String refreshFairnessPolicyController() {
return rpcClient.refreshFairnessPolicyController(new Configuration());
}
/**
* Deals with loading datanode report into the cache and refresh.
*/

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
import static org.junit.Assert.assertEquals;
public class TestRouterRefreshFairnessPolicyController {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterRefreshFairnessPolicyController.class);
private final GenericTestUtils.LogCapturer controllerLog =
GenericTestUtils.LogCapturer.captureLogs(AbstractRouterRpcFairnessPolicyController.LOG);
private StateStoreDFSCluster cluster;
@BeforeClass
public static void setLogLevel() {
GenericTestUtils.setLogLevel(AbstractRouterRpcFairnessPolicyController.LOG, Level.DEBUG);
}
@After
public void cleanup() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Before
public void setupCluster() throws Exception {
cluster = new StateStoreDFSCluster(false, 2);
Configuration conf = new RouterConfigBuilder().stateStore().rpc().build();
// Handlers concurrent:ns0 = 3:3
conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
StaticRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class);
conf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 9);
// Allow metrics
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, true);
// Datanodes not needed for this test.
cluster.setNumDatanodesPerNameservice(0);
cluster.addRouterOverrides(conf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
}
@Test
public void testRefreshNonexistentHandlerClass() {
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
"org.apache.hadoop.hdfs.server.federation.fairness.ThisControllerDoesNotExist");
assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
routerContext.getRouterRpcClient()
.refreshFairnessPolicyController(routerContext.getConf()));
}
@Test
public void testRefreshClassDoesNotImplementControllerInterface() {
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
routerContext.getConf()
.set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, "java.lang.String");
assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
routerContext.getRouterRpcClient()
.refreshFairnessPolicyController(routerContext.getConf()));
}
@Test
public void testRefreshSuccessful() {
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
StaticRouterRpcFairnessPolicyController.class.getCanonicalName());
assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
routerContext.getRouterRpcClient()
.refreshFairnessPolicyController(routerContext.getConf()));
routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
NoRouterRpcFairnessPolicyController.class.getCanonicalName());
assertEquals(NoRouterRpcFairnessPolicyController.class.getCanonicalName(),
routerContext.getRouterRpcClient()
.refreshFairnessPolicyController(routerContext.getConf()));
}
@Test
public void testConcurrentRefreshRequests() throws InterruptedException {
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
controllerLog.clearOutput();
// Spawn 100 concurrent refresh requests
Thread[] threads = new Thread[100];
for (int i = 0; i < 100; i++) {
threads[i] = new Thread(() -> {
client.refreshFairnessPolicyController(routerContext.getConf());
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
// There should be 100 controller shutdowns. All controllers created should be shut down.
assertEquals(100, StringUtils.countMatches(controllerLog.getOutput(),
"Shutting down router fairness policy controller"));
controllerLog.clearOutput();
}
@Test
public void testRefreshStaticChangeHandlers() throws Exception {
// Setup and mock
MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
final long sleepTime = 3000;
Mockito.doAnswer(invocationOnMock -> {
Thread.sleep(sleepTime);
return null;
}).when(client)
.invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
// No calls yet
assertEquals("{}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
List<Thread> preRefreshInvocations = makeDummyInvocations(client, 4, "ns0");
Thread.sleep(2000);
// 3 permits acquired, calls will take 3s to finish and release permits
// 1 invocation rejected
assertEquals("{\"ns0\":3}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
assertEquals("{\"ns0\":1}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitRejectedPerNs());
Configuration conf = routerContext.getConf();
final int newNs0Permits = 2;
final int newNs1Permits = 4;
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0", newNs0Permits);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", newNs1Permits);
Thread threadRefreshController = new Thread(() -> {
client.refreshFairnessPolicyController(routerContext.getConf());
});
threadRefreshController.start();
threadRefreshController.join();
// Wait for all dummy invocation threads to finish
for (Thread thread : preRefreshInvocations) {
thread.join();
}
// Controller should now have 2:4 handlers for ns0:ns1
// Make 4 calls to ns0 and 6 calls to ns1 so that each will fail twice
StaticRouterRpcFairnessPolicyController controller =
(StaticRouterRpcFairnessPolicyController) client.getRouterRpcFairnessPolicyController();
System.out.println(controller.getAvailableHandlerOnPerNs());
List<Thread> ns0Invocations = makeDummyInvocations(client, newNs0Permits + 2, "ns0");
List<Thread> ns1Invocations = makeDummyInvocations(client, newNs1Permits + 2, "ns1");
// Wait for these threads to finish
for (Thread thread : ns0Invocations) {
thread.join();
}
for (Thread thread : ns1Invocations) {
thread.join();
}
assertEquals("{\"ns0\":5,\"ns1\":4}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
assertEquals("{\"ns0\":3,\"ns1\":2}",
routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitRejectedPerNs());
}
private List<Thread> makeDummyInvocations(RouterRpcClient client, final int nThreads,
final String namespace) {
RemoteMethod dummyMethod = Mockito.mock(RemoteMethod.class);
List<Thread> threadAcquirePermits = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
Thread threadAcquirePermit = new Thread(() -> {
try {
client.invokeSingle(namespace, dummyMethod);
} catch (IOException e) {
e.printStackTrace();
}
});
threadAcquirePermits.add(threadAcquirePermit);
threadAcquirePermit.start();
}
return threadAcquirePermits;
}
}