diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index 5c9ceb865a..e239e5e905 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -104,7 +104,9 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) builder.putNamespaceStateIds(k, v.get()); } }); - headerBuilder.setRouterFederatedState(builder.build().toByteString()); + if (builder.getNamespaceStateIdsCount() <= maxSizeOfFederatedStateToPropagate) { + headerBuilder.setRouterFederatedState(builder.build().toByteString()); + } } public LongAccumulator getNamespaceStateId(String nsId) { @@ -155,9 +157,7 @@ public static long getClientStateIdFromCurrentCall(String nsId) { @Override public void updateResponseState(RpcResponseHeaderProto.Builder header) { - if (namespaceIdMap.size() <= maxSizeOfFederatedStateToPropagate) { - setResponseHeaderState(header); - } + setResponseHeaderState(header); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index eaee5b8b14..3f773efd63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -641,6 +641,46 @@ public void testRouterResponseHeaderState() { Assertions.assertEquals(10L, latestFederateState.get("ns0")); } + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testRouterResponseHeaderStateMaxSizeLimit() { + Configuration conf = new Configuration(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + conf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 1); + + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf); + + ConcurrentHashMap namespaceIdMap = + routerStateIdContext.getNamespaceIdMap(); + namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10)); + namespaceIdMap.put("ns1", new LongAccumulator(Math::max, Long.MIN_VALUE)); + + RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder = + RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS); + routerStateIdContext.updateResponseState(responseHeaderBuilder); + + Map latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( + responseHeaderBuilder.build().getRouterFederatedState()); + // Validate that ns0 is still part of the header + Assertions.assertEquals(1, latestFederateState.size()); + + namespaceIdMap.put("ns2", new LongAccumulator(Math::max, 20)); + // Rebuild header + responseHeaderBuilder = + RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS); + routerStateIdContext.updateResponseState(responseHeaderBuilder); + latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( + responseHeaderBuilder.build().getRouterFederatedState()); + // Validate that ns0 is still part of the header + Assertions.assertEquals(0, latestFederateState.size()); + } + @EnumSource(ConfigSetting.class) @ParameterizedTest public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception {