From 7d3b6a36b8aad3b417d806edbe6586c2b14e9fd5 Mon Sep 17 00:00:00 2001 From: LiuGuH <444506464@qq.com> Date: Fri, 5 Jan 2024 06:43:11 +0800 Subject: [PATCH] HDFS-17306. RBF: Router should not return nameservices that does not enable observer nodes in RpcResponseHeaderProto (#6385) --- .../router/RouterStateIdContext.java | 10 +++++- .../router/TestObserverWithRouter.java | 34 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index b3bab732c0..14adc16d99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -85,7 +85,11 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) return; } RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder(); - namespaceIdMap.forEach((k, v) -> builder.putNamespaceStateIds(k, v.get())); + namespaceIdMap.forEach((k, v) -> { + if (v.get() != Long.MIN_VALUE) { + builder.putNamespaceStateIds(k, v.get()); + } + }); headerBuilder.setRouterFederatedState(builder.build().toByteString()); } @@ -97,6 +101,10 @@ public List getNamespaces() { return Collections.list(namespaceIdMap.keys()); } + public ConcurrentHashMap getNamespaceIdMap() { + return namespaceIdMap; + } + public void removeNamespaceStateId(String nsId) { namespaceIdMap.remove(nsId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index a81f773804..e20e3ad2a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAccumulator; @@ -586,6 +587,39 @@ public void testClientReceiveResponseState() { Assertions.assertEquals(10L, latestFederateState.get("ns0")); } + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testRouterResponseHeaderState() { + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration()); + + ConcurrentHashMap namespaceIdMap = + routerStateIdContext.getNamespaceIdMap(); + namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10)); + namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100)); + namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE)); + + Map mockMapping = new HashMap<>(); + mockMapping.put("ns0", 10L); + mockMapping.put("ns2", 100L); + mockMapping.put("ns3", Long.MIN_VALUE); + RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder(); + mockMapping.forEach(builder::putNamespaceStateIds); + + RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder = + RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS) + .setRouterFederatedState(builder.build().toByteString()); + routerStateIdContext.updateResponseState(responseHeaderBuilder); + + Map latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( + responseHeaderBuilder.build().getRouterFederatedState()); + Assertions.assertEquals(2, latestFederateState.size()); + Assertions.assertEquals(10L, latestFederateState.get("ns0")); + Assertions.assertEquals(100L, latestFederateState.get("ns1")); + } + @EnumSource(ConfigSetting.class) @ParameterizedTest public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception {