From 29b0a6c28d4529ffb582eb0614aa2836d416cb55 Mon Sep 17 00:00:00 2001 From: Jian Zhang <1361320460@qq.com> Date: Sat, 9 Dec 2023 02:18:22 +0800 Subject: [PATCH] HDFS-17265. RBF: Throwing an exception prevents the permit from being released when using FairnessPolicyController (#6298) --- ...ractRouterRpcFairnessPolicyController.java | 3 +- .../NoRouterRpcFairnessPolicyController.java | 5 ++ .../RouterRpcFairnessPolicyController.java | 8 +++ .../federation/router/RouterRpcClient.java | 6 +- .../fairness/TestRouterHandlersFairness.java | 56 +++++++++++++++++++ 5 files changed, 74 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java index db917be712..570480d49e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java @@ -89,7 +89,8 @@ protected void insertNameServiceWithPermits(String nsId, int maxPermits) { this.permits.put(nsId, new Semaphore(maxPermits)); } - protected int getAvailablePermits(String nsId) { + @Override + public int getAvailablePermits(String nsId) { return this.permits.get(nsId).availablePermits(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java index 3b85da59e1..e0a5c31a2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java @@ -51,4 +51,9 @@ public void shutdown() { public String getAvailableHandlerOnPerNs(){ return "N/A"; } + + @Override + public int getAvailablePermits(String nsId) { + return 0; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java index ce1844f1a4..90d8f7dd47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java @@ -69,4 +69,12 @@ public interface RouterRpcFairnessPolicyController { * @return the JSON string of the available handler for each name service. */ String getAvailableHandlerOnPerNs(); + + /** + * Returns the available handler for each name service. + * + * @param nsId name service id. + * @return the available handler for each name service. + */ + int getAvailablePermits(String nsId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index ad95e9532a..d25e5ae4d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1119,10 +1119,10 @@ public RemoteResult invokeSequential( // Invoke in priority order for (final RemoteLocationContext loc : locations) { String ns = loc.getNameserviceId(); - acquirePermit(ns, ugi, remoteMethod, controller); boolean isObserverRead = isObserverReadEligible(ns, m); List namenodes = getOrderedNamenodes(ns, isObserverRead); + acquirePermit(ns, ugi, remoteMethod, controller); try { Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); @@ -1482,11 +1482,11 @@ public Map invokeConcurrent( // Shortcut, just one call T location = locations.iterator().next(); String ns = location.getNameserviceId(); - RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(ns, ugi, method, controller); boolean isObserverRead = isObserverReadEligible(ns, m); final List namenodes = getOrderedNamenodes(ns, isObserverRead); + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(ns, ugi, method, controller); try { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java index 8fc9de0cb2..9b58c82bce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java @@ -20,9 +20,13 @@ import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -32,9 +36,14 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.After; import org.junit.Test; import org.slf4j.Logger; @@ -100,6 +109,53 @@ public void testFairnessControlOn() throws Exception { startLoadTest(true); } + /** + * Ensure that the semaphore is not acquired, + * when invokeSequential or invokeConcurrent throws any exception. + */ + @Test + public void testReleasedWhenExceptionOccurs() throws Exception{ + setupCluster(true, false); + RouterContext routerContext = cluster.getRandomRouter(); + RouterRpcClient rpcClient = + routerContext.getRouter().getRpcServer().getRPCClient(); + // Mock an ActiveNamenodeResolver and inject it into RouterRpcClient, + // so RouterRpcClient's getOrderedNamenodes method will report an exception. + ActiveNamenodeResolver mockNamenodeResolver = mock(ActiveNamenodeResolver.class); + Field field = rpcClient.getClass().getDeclaredField("namenodeResolver"); + field.setAccessible(true); + field.set(rpcClient, mockNamenodeResolver); + + // Use getFileInfo test invokeSequential. + DFSClient client = routerContext.getClient(); + int availablePermits = + rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"); + LambdaTestUtils.intercept(IOException.class, () -> { + LOG.info("Use getFileInfo test invokeSequential."); + client.getFileInfo("/test.txt"); + }); + // Ensure that the semaphore is not acquired. + assertEquals(availablePermits, + rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0")); + + // Use renewLease test invokeConcurrent. + Collection locations = new ArrayList<>(); + locations.add(new RemoteLocation("ns0", "/", "/")); + RemoteMethod renewLease = new RemoteMethod( + "renewLease", + new Class[]{java.lang.String.class, java.util.List.class}, + new Object[]{null, null}); + availablePermits = + rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"); + LambdaTestUtils.intercept(IOException.class, () -> { + LOG.info("Use renewLease test invokeConcurrent."); + rpcClient.invokeConcurrent(locations, renewLease); + }); + // Ensure that the semaphore is not acquired. + assertEquals(availablePermits, + rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0")); + } + /** * Start a generic load test as a client against a cluster which has either * fairness configured or not configured. Test will spawn a set of 100