HDFS-16865. The source path is always / after RBF proxied the complete, addBlock and getAdditionalDatanode RPC. (#5200). Contributed by ZanderXu.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
ZanderXu 2023-05-06 11:20:21 +08:00 committed by GitHub
parent cda9863d54
commit 4ee92efb73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 16 deletions

View File

@ -483,12 +483,10 @@ public LocatedBlock addBlock(String src, String clientName,
new RemoteParam(), clientName, previous, excludedNodes, fileId,
favoredNodes, addBlockFlags);
final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
if (previous != null) {
return rpcClient.invokeSingle(previous, method, LocatedBlock.class);
return rpcClient.invokeSingle(previous, method, locations, LocatedBlock.class);
}
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, true);
// TODO verify the excludedNodes and favoredNodes are acceptable to this NN
return rpcClient.invokeSequential(
locations, method, LocatedBlock.class, null);
@ -513,12 +511,11 @@ public LocatedBlock getAdditionalDatanode(final String src, final long fileId,
new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes,
numAdditionalNodes, clientName);
final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false);
if (blk != null) {
return rpcClient.invokeSingle(blk, method, LocatedBlock.class);
return rpcClient.invokeSingle(blk, method, locations, LocatedBlock.class);
}
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false);
return rpcClient.invokeSequential(
locations, method, LocatedBlock.class, null);
}
@ -532,7 +529,9 @@ public void abandonBlock(ExtendedBlock b, long fileId, String src,
new Class<?>[] {ExtendedBlock.class, long.class, String.class,
String.class},
b, fileId, new RemoteParam(), holder);
rpcClient.invokeSingle(b, method);
final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false);
rpcClient.invokeSingle(b, method, locations, Void.class);
}
@Override
@ -545,12 +544,11 @@ public boolean complete(String src, String clientName, ExtendedBlock last,
long.class},
new RemoteParam(), clientName, last, fileId);
final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
if (last != null) {
return rpcClient.invokeSingle(last, method, Boolean.class);
return rpcClient.invokeSingle(last, method, locations, Boolean.class);
}
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, true);
// Complete can return true/false, so don't expect a result
return rpcClient.invokeSequential(locations, method, Boolean.class, null);
}
@ -580,7 +578,7 @@ public void updatePipeline(String clientName, ExtendedBlock oldBlock,
new Class<?>[] {String.class, ExtendedBlock.class, ExtendedBlock.class,
DatanodeID[].class, String[].class},
clientName, oldBlock, newBlock, newNodes, newStorageIDs);
rpcClient.invokeSingle(oldBlock, method);
rpcClient.invokeSingleBlockPool(oldBlock.getBlockPoolId(), method);
}
@Override

View File

@ -848,6 +848,26 @@ private static IOException getCleanException(IOException ioe) {
return ret;
}
/**
* Try to get the remote location whose bpId is same with the input bpId from the input locations.
* @param locations the input RemoteLocations.
* @param bpId the input bpId.
* @return the remote location whose bpId is same with the input.
* @throws IOException
*/
private RemoteLocation getLocationWithBPID(List<RemoteLocation> locations, String bpId)
throws IOException {
String nsId = getNameserviceForBlockPoolId(bpId);
for (RemoteLocation l : locations) {
if (l.getNameserviceId().equals(nsId)) {
return l;
}
}
LOG.debug("Can't find remote location for the {} from {}", bpId, locations);
return new RemoteLocation(nsId, "/", "/");
}
/**
* Invokes a ClientProtocol method. Determines the target nameservice via a
* provided block.
@ -857,13 +877,15 @@ private static IOException getCleanException(IOException ioe) {
*
* @param block Block used to determine appropriate nameservice.
* @param method The remote method and parameters to invoke.
* @param locations The remote locations will be used.
* @param clazz Class for the return type.
* @return The result of invoking the method.
* @throws IOException If the invoke generated an error.
*/
public Object invokeSingle(final ExtendedBlock block, RemoteMethod method)
throws IOException {
String bpId = block.getBlockPoolId();
return invokeSingleBlockPool(bpId, method);
public <T> T invokeSingle(final ExtendedBlock block, RemoteMethod method,
final List<RemoteLocation> locations, Class<T> clazz) throws IOException {
RemoteLocation location = getLocationWithBPID(locations, block.getBlockPoolId());
return invokeSingle(location, method, clazz);
}
/**

View File

@ -73,6 +73,7 @@
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import org.slf4j.event.Level;
/**
* The RPC interface of the {@link getRouter()} implemented by
@ -275,6 +276,14 @@ public void testProxyRenameFiles() throws IOException, InterruptedException {
@Test
public void testPreviousBlockNotNull()
throws IOException, URISyntaxException {
final GenericTestUtils.LogCapturer stateChangeLog =
GenericTestUtils.LogCapturer.captureLogs(NameNode.stateChangeLog);
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.DEBUG);
final GenericTestUtils.LogCapturer nameNodeLog =
GenericTestUtils.LogCapturer.captureLogs(NameNode.LOG);
GenericTestUtils.setLogLevel(NameNode.LOG, Level.DEBUG);
final FederationRPCMetrics metrics = getRouterContext().
getRouter().getRpcServer().getRPCMetrics();
final ClientProtocol clientProtocol = getRouterProtocol();
@ -305,6 +314,7 @@ testPath, new FsPermission("777"), clientName,
long proxyNumAddBlock = metrics.getProcessingOps();
assertEquals(2, proxyNumAddBlock - proxyNumCreate);
stateChangeLog.clearOutput();
// Add a block via router and previous block is not null.
LocatedBlock blockTwo = clientProtocol.addBlock(
testPath, clientName, blockOne.getBlock(), null,
@ -312,7 +322,9 @@ testPath, new FsPermission("777"), clientName,
assertNotNull(blockTwo);
long proxyNumAddBlock2 = metrics.getProcessingOps();
assertEquals(1, proxyNumAddBlock2 - proxyNumAddBlock);
assertTrue(stateChangeLog.getOutput().contains("BLOCK* getAdditionalBlock: " + testPath));
nameNodeLog.clearOutput();
// Get additionalDatanode via router and block is not null.
DatanodeInfo[] exclusions = DatanodeInfo.EMPTY_ARRAY;
LocatedBlock newBlock = clientProtocol.getAdditionalDatanode(
@ -322,12 +334,15 @@ testPath, new FsPermission("777"), clientName,
assertNotNull(newBlock);
long proxyNumAdditionalDatanode = metrics.getProcessingOps();
assertEquals(1, proxyNumAdditionalDatanode - proxyNumAddBlock2);
assertTrue(nameNodeLog.getOutput().contains("getAdditionalDatanode: src=" + testPath));
stateChangeLog.clearOutput();
// Complete the file via router and last block is not null.
clientProtocol.complete(testPath, clientName,
newBlock.getBlock(), status.getFileId());
long proxyNumComplete = metrics.getProcessingOps();
assertEquals(1, proxyNumComplete - proxyNumAdditionalDatanode);
assertTrue(stateChangeLog.getOutput().contains("DIR* NameSystem.completeFile: " + testPath));
} finally {
clientProtocol.delete(testPath, true);
}