HDFS-17111. RBF: Optimize msync to only call nameservices that have observer reads enabled. (#5860). Contributed by Simbarashe Dzinamarira.
This commit is contained in:
parent
c04a17f116
commit
ad001c93cf
@ -1928,9 +1928,17 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||||||
@Override
|
@Override
|
||||||
public void msync() throws IOException {
|
public void msync() throws IOException {
|
||||||
rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
|
rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
|
||||||
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
// Only msync to nameservices with observer reads enabled.
|
||||||
|
Set<FederationNamespaceInfo> allNamespaces = namenodeResolver.getNamespaces();
|
||||||
RemoteMethod method = new RemoteMethod("msync");
|
RemoteMethod method = new RemoteMethod("msync");
|
||||||
rpcClient.invokeConcurrent(nss, method);
|
Set<FederationNamespaceInfo> namespacesEligibleForObserverReads = allNamespaces
|
||||||
|
.stream()
|
||||||
|
.filter(ns -> rpcClient.isNamespaceObserverReadEligible(ns.getNameserviceId()))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
if (namespacesEligibleForObserverReads.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
rpcClient.invokeConcurrent(namespacesEligibleForObserverReads, method);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1783,9 +1783,16 @@ public class RouterRpcClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean isObserverReadEligible(String nsId, Method method) {
|
private boolean isObserverReadEligible(String nsId, Method method) {
|
||||||
boolean isReadEnabledForNamespace =
|
return isReadCall(method) && isNamespaceObserverReadEligible(nsId);
|
||||||
observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
|
}
|
||||||
return isReadEnabledForNamespace && isReadCall(method);
|
|
||||||
|
/**
|
||||||
|
* Check if a namespace is eligible for observer reads.
|
||||||
|
* @param nsId namespaceID
|
||||||
|
* @return whether the 'namespace' has observer reads enabled.
|
||||||
|
*/
|
||||||
|
boolean isNamespaceObserverReadEligible(String nsId) {
|
||||||
|
return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1793,6 +1800,9 @@ public class RouterRpcClient {
|
|||||||
* @return whether the 'method' is a read-only operation.
|
* @return whether the 'method' is a read-only operation.
|
||||||
*/
|
*/
|
||||||
private static boolean isReadCall(Method method) {
|
private static boolean isReadCall(Method method) {
|
||||||
|
if (method == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (!method.isAnnotationPresent(ReadOnly.class)) {
|
if (!method.isAnnotationPresent(ReadOnly.class)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -871,4 +871,45 @@ public class TestObserverWithRouter {
|
|||||||
Assertions.fail("Unknown config setting: " + configSetting);
|
Assertions.fail("Unknown config setting: " + configSetting);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@EnumSource(ConfigSetting.class)
|
||||||
|
@ParameterizedTest
|
||||||
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
||||||
|
public void testMsyncOnlyToNamespaceWithObserver(ConfigSetting configSetting) throws Exception {
|
||||||
|
Configuration confOverride = new Configuration(false);
|
||||||
|
String namespaceWithObserverReadsDisabled = "ns0";
|
||||||
|
// Disable observer reads for ns0
|
||||||
|
confOverride.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES,
|
||||||
|
namespaceWithObserverReadsDisabled);
|
||||||
|
startUpCluster(1, confOverride);
|
||||||
|
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
||||||
|
|
||||||
|
// Send msync request
|
||||||
|
fileSystem.msync();
|
||||||
|
|
||||||
|
long rpcCountForActive = routerContext.getRouter().getRpcServer()
|
||||||
|
.getRPCMetrics().getActiveProxyOps();
|
||||||
|
// There should only be one call to the namespace that has an observer.
|
||||||
|
assertEquals("Only one call to the namespace with an observer", 1, rpcCountForActive);
|
||||||
|
}
|
||||||
|
|
||||||
|
@EnumSource(ConfigSetting.class)
|
||||||
|
@ParameterizedTest
|
||||||
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
||||||
|
public void testMsyncWithNoNamespacesEligibleForCRS(ConfigSetting configSetting)
|
||||||
|
throws Exception {
|
||||||
|
Configuration confOverride = new Configuration(false);
|
||||||
|
// Disable observer reads for all namespaces.
|
||||||
|
confOverride.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, false);
|
||||||
|
startUpCluster(1, confOverride);
|
||||||
|
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
|
||||||
|
|
||||||
|
// Send msync request.
|
||||||
|
fileSystem.msync();
|
||||||
|
|
||||||
|
long rpcCountForActive = routerContext.getRouter().getRpcServer()
|
||||||
|
.getRPCMetrics().getActiveProxyOps();
|
||||||
|
// There should no calls to any namespace.
|
||||||
|
assertEquals("No calls to any namespace", 0, rpcCountForActive);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user