From 8193a8402030dceac3c6ab4f4b599349eb114744 Mon Sep 17 00:00:00 2001 From: LiuGuH <444506464@qq.com> Date: Wed, 24 Jan 2024 04:30:08 +0800 Subject: [PATCH] =?UTF-8?q?HDFS-17324.=20RBF:=20Router=20should=20not=20re?= =?UTF-8?q?turn=20nameservices=20that=20not=20enable=20observer=20r?= =?UTF-8?q?=E2=80=A6=20(#6412)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../router/RouterStateIdContext.java | 25 ++++++++++++++++++- .../router/TestObserverWithRouter.java | 21 +++++++--------- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index 14adc16d99..5c9ceb865a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.lang.reflect.Method; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -58,6 +59,10 @@ class RouterStateIdContext implements AlignmentContext { private final ConcurrentHashMap namespaceIdMap; // Size limit for the map of state Ids to send to clients. private final int maxSizeOfFederatedStateToPropagate; + /** Observer read enabled. Default for all nameservices. */ + private final boolean observerReadEnabledDefault; + /** Nameservice specific overrides of the default setting for enabling observer reads. */ + private HashSet observerReadEnabledOverrides = new HashSet<>(); RouterStateIdContext(Configuration conf) { this.coordinatedMethods = new HashSet<>(); @@ -75,6 +80,15 @@ class RouterStateIdContext implements AlignmentContext { maxSizeOfFederatedStateToPropagate = conf.getInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT); + + this.observerReadEnabledDefault = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, + RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE); + String[] observerReadOverrides = + conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES); + if (observerReadOverrides != null) { + observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides)); + } } /** @@ -86,7 +100,7 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) } RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder(); namespaceIdMap.forEach((k, v) -> { - if (v.get() != Long.MIN_VALUE) { + if ((v.get() != Long.MIN_VALUE) && isNamespaceObserverReadEligible(k)) { builder.putNamespaceStateIds(k, v.get()); } }); @@ -177,4 +191,13 @@ public boolean isCoordinatedCall(String protocolName, String methodName) { return protocolName.equals(ClientProtocol.class.getCanonicalName()) && coordinatedMethods.contains(methodName); } + + /** + * Check if a namespace is eligible for observer reads. + * @param nsId namespaceID + * @return whether the 'namespace' has observer reads enabled. + */ + boolean isNamespaceObserverReadEligible(String nsId) { + return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index e20e3ad2a0..0ff166e0c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -590,7 +590,12 @@ public void testClientReceiveResponseState() { @Test @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) public void testRouterResponseHeaderState() { - RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration()); + // This conf makes ns1 that is not eligible for observer reads. + Configuration conf = new Configuration(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + conf.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns1"); + + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf); ConcurrentHashMap namespaceIdMap = routerStateIdContext.getNamespaceIdMap(); @@ -598,26 +603,18 @@ public void testRouterResponseHeaderState() { namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100)); namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE)); - Map mockMapping = new HashMap<>(); - mockMapping.put("ns0", 10L); - mockMapping.put("ns2", 100L); - mockMapping.put("ns3", Long.MIN_VALUE); - RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder(); - mockMapping.forEach(builder::putNamespaceStateIds); - RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder = RpcHeaderProtos.RpcResponseHeaderProto .newBuilder() .setCallId(1) - .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS) - .setRouterFederatedState(builder.build().toByteString()); + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS); routerStateIdContext.updateResponseState(responseHeaderBuilder); Map latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( responseHeaderBuilder.build().getRouterFederatedState()); - Assertions.assertEquals(2, latestFederateState.size()); + // Only ns0 will be in latestFederateState + Assertions.assertEquals(1, latestFederateState.size()); Assertions.assertEquals(10L, latestFederateState.get("ns0")); - Assertions.assertEquals(100L, latestFederateState.get("ns1")); } @EnumSource(ConfigSetting.class)