From 4ee92efb73a90ae7f909e96de242d216ad6878b2 Mon Sep 17 00:00:00 2001 From: ZanderXu Date: Sat, 6 May 2023 11:20:21 +0800 Subject: [PATCH] HDFS-16865. The source path is always / after RBF proxied the complete, addBlock and getAdditionalDatanode RPC. (#5200). Contributed by ZanderXu. Reviewed-by: Inigo Goiri Signed-off-by: Ayush Saxena --- .../router/RouterClientProtocol.java | 22 +++++++------- .../federation/router/RouterRpcClient.java | 30 ++++++++++++++++--- .../router/TestRouterRpcMultiDestination.java | 15 ++++++++++ 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 34e3666a94..9d987bc7f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -483,12 +483,10 @@ public LocatedBlock addBlock(String src, String clientName, new RemoteParam(), clientName, previous, excludedNodes, fileId, favoredNodes, addBlockFlags); + final List locations = rpcServer.getLocationsForPath(src, true); if (previous != null) { - return rpcClient.invokeSingle(previous, method, LocatedBlock.class); + return rpcClient.invokeSingle(previous, method, locations, LocatedBlock.class); } - - final List locations = - rpcServer.getLocationsForPath(src, true); // TODO verify the excludedNodes and favoredNodes are acceptable to this NN return rpcClient.invokeSequential( locations, method, LocatedBlock.class, null); @@ -513,12 +511,11 @@ public LocatedBlock getAdditionalDatanode(final String src, final long fileId, new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes, numAdditionalNodes, clientName); + final List locations = rpcServer.getLocationsForPath(src, false); if (blk != null) { - return rpcClient.invokeSingle(blk, method, LocatedBlock.class); + return rpcClient.invokeSingle(blk, method, locations, LocatedBlock.class); } - final List locations = - rpcServer.getLocationsForPath(src, false); return rpcClient.invokeSequential( locations, method, LocatedBlock.class, null); } @@ -532,7 +529,9 @@ public void abandonBlock(ExtendedBlock b, long fileId, String src, new Class[] {ExtendedBlock.class, long.class, String.class, String.class}, b, fileId, new RemoteParam(), holder); - rpcClient.invokeSingle(b, method); + + final List locations = rpcServer.getLocationsForPath(src, false); + rpcClient.invokeSingle(b, method, locations, Void.class); } @Override @@ -545,12 +544,11 @@ public boolean complete(String src, String clientName, ExtendedBlock last, long.class}, new RemoteParam(), clientName, last, fileId); + final List locations = rpcServer.getLocationsForPath(src, true); if (last != null) { - return rpcClient.invokeSingle(last, method, Boolean.class); + return rpcClient.invokeSingle(last, method, locations, Boolean.class); } - final List locations = - rpcServer.getLocationsForPath(src, true); // Complete can return true/false, so don't expect a result return rpcClient.invokeSequential(locations, method, Boolean.class, null); } @@ -580,7 +578,7 @@ public void updatePipeline(String clientName, ExtendedBlock oldBlock, new Class[] {String.class, ExtendedBlock.class, ExtendedBlock.class, DatanodeID[].class, String[].class}, clientName, oldBlock, newBlock, newNodes, newStorageIDs); - rpcClient.invokeSingle(oldBlock, method); + rpcClient.invokeSingleBlockPool(oldBlock.getBlockPoolId(), method); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index ece34abf07..9ac0aa17c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -848,6 +848,26 @@ private static IOException getCleanException(IOException ioe) { return ret; } + /** + * Try to get the remote location whose bpId is same with the input bpId from the input locations. + * @param locations the input RemoteLocations. + * @param bpId the input bpId. + * @return the remote location whose bpId is same with the input. + * @throws IOException + */ + private RemoteLocation getLocationWithBPID(List locations, String bpId) + throws IOException { + String nsId = getNameserviceForBlockPoolId(bpId); + for (RemoteLocation l : locations) { + if (l.getNameserviceId().equals(nsId)) { + return l; + } + } + + LOG.debug("Can't find remote location for the {} from {}", bpId, locations); + return new RemoteLocation(nsId, "/", "/"); + } + /** * Invokes a ClientProtocol method. Determines the target nameservice via a * provided block. @@ -857,13 +877,15 @@ private static IOException getCleanException(IOException ioe) { * * @param block Block used to determine appropriate nameservice. * @param method The remote method and parameters to invoke. + * @param locations The remote locations will be used. + * @param clazz – Class for the return type. * @return The result of invoking the method. * @throws IOException If the invoke generated an error. */ - public Object invokeSingle(final ExtendedBlock block, RemoteMethod method) - throws IOException { - String bpId = block.getBlockPoolId(); - return invokeSingleBlockPool(bpId, method); + public T invokeSingle(final ExtendedBlock block, RemoteMethod method, + final List locations, Class clazz) throws IOException { + RemoteLocation location = getLocationWithBPID(locations, block.getBlockPoolId()); + return invokeSingle(location, method, clazz); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java index 30a2bc1102..336ea39138 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java @@ -73,6 +73,7 @@ import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; +import org.slf4j.event.Level; /** * The RPC interface of the {@link getRouter()} implemented by @@ -275,6 +276,14 @@ public void testProxyRenameFiles() throws IOException, InterruptedException { @Test public void testPreviousBlockNotNull() throws IOException, URISyntaxException { + final GenericTestUtils.LogCapturer stateChangeLog = + GenericTestUtils.LogCapturer.captureLogs(NameNode.stateChangeLog); + GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.DEBUG); + + final GenericTestUtils.LogCapturer nameNodeLog = + GenericTestUtils.LogCapturer.captureLogs(NameNode.LOG); + GenericTestUtils.setLogLevel(NameNode.LOG, Level.DEBUG); + final FederationRPCMetrics metrics = getRouterContext(). getRouter().getRpcServer().getRPCMetrics(); final ClientProtocol clientProtocol = getRouterProtocol(); @@ -305,6 +314,7 @@ testPath, new FsPermission("777"), clientName, long proxyNumAddBlock = metrics.getProcessingOps(); assertEquals(2, proxyNumAddBlock - proxyNumCreate); + stateChangeLog.clearOutput(); // Add a block via router and previous block is not null. LocatedBlock blockTwo = clientProtocol.addBlock( testPath, clientName, blockOne.getBlock(), null, @@ -312,7 +322,9 @@ testPath, new FsPermission("777"), clientName, assertNotNull(blockTwo); long proxyNumAddBlock2 = metrics.getProcessingOps(); assertEquals(1, proxyNumAddBlock2 - proxyNumAddBlock); + assertTrue(stateChangeLog.getOutput().contains("BLOCK* getAdditionalBlock: " + testPath)); + nameNodeLog.clearOutput(); // Get additionalDatanode via router and block is not null. DatanodeInfo[] exclusions = DatanodeInfo.EMPTY_ARRAY; LocatedBlock newBlock = clientProtocol.getAdditionalDatanode( @@ -322,12 +334,15 @@ testPath, new FsPermission("777"), clientName, assertNotNull(newBlock); long proxyNumAdditionalDatanode = metrics.getProcessingOps(); assertEquals(1, proxyNumAdditionalDatanode - proxyNumAddBlock2); + assertTrue(nameNodeLog.getOutput().contains("getAdditionalDatanode: src=" + testPath)); + stateChangeLog.clearOutput(); // Complete the file via router and last block is not null. clientProtocol.complete(testPath, clientName, newBlock.getBlock(), status.getFileId()); long proxyNumComplete = metrics.getProcessingOps(); assertEquals(1, proxyNumComplete - proxyNumAdditionalDatanode); + assertTrue(stateChangeLog.getOutput().contains("DIR* NameSystem.completeFile: " + testPath)); } finally { clientProtocol.delete(testPath, true); }