HDFS-17558. RBF: Make maxSizeOfFederatedStateToPropagate work on setResponseHeaderState. (#6902)

Co-authored-by: fuchaohong <fuchaohong@chinatelecom.cn>
This commit is contained in:
fuchaohong 2024-07-18 00:43:00 +08:00 committed by GitHub
parent 1360c7574a
commit ebbe9628d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 44 additions and 4 deletions

View File

@ -104,8 +104,10 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder)
builder.putNamespaceStateIds(k, v.get()); builder.putNamespaceStateIds(k, v.get());
} }
}); });
if (builder.getNamespaceStateIdsCount() <= maxSizeOfFederatedStateToPropagate) {
headerBuilder.setRouterFederatedState(builder.build().toByteString()); headerBuilder.setRouterFederatedState(builder.build().toByteString());
} }
}
public LongAccumulator getNamespaceStateId(String nsId) { public LongAccumulator getNamespaceStateId(String nsId) {
return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE)); return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
@ -155,10 +157,8 @@ public static long getClientStateIdFromCurrentCall(String nsId) {
@Override @Override
public void updateResponseState(RpcResponseHeaderProto.Builder header) { public void updateResponseState(RpcResponseHeaderProto.Builder header) {
if (namespaceIdMap.size() <= maxSizeOfFederatedStateToPropagate) {
setResponseHeaderState(header); setResponseHeaderState(header);
} }
}
@Override @Override
public void receiveResponseState(RpcResponseHeaderProto header) { public void receiveResponseState(RpcResponseHeaderProto header) {

View File

@ -641,6 +641,46 @@ public void testRouterResponseHeaderState() {
Assertions.assertEquals(10L, latestFederateState.get("ns0")); Assertions.assertEquals(10L, latestFederateState.get("ns0"));
} }
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testRouterResponseHeaderStateMaxSizeLimit() {
Configuration conf = new Configuration();
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
conf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 1);
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf);
ConcurrentHashMap<String, LongAccumulator> namespaceIdMap =
routerStateIdContext.getNamespaceIdMap();
namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10));
namespaceIdMap.put("ns1", new LongAccumulator(Math::max, Long.MIN_VALUE));
RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder =
RpcHeaderProtos.RpcResponseHeaderProto
.newBuilder()
.setCallId(1)
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
routerStateIdContext.updateResponseState(responseHeaderBuilder);
Map<String, Long> latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
responseHeaderBuilder.build().getRouterFederatedState());
// Validate that ns0 is still part of the header
Assertions.assertEquals(1, latestFederateState.size());
namespaceIdMap.put("ns2", new LongAccumulator(Math::max, 20));
// Rebuild header
responseHeaderBuilder =
RpcHeaderProtos.RpcResponseHeaderProto
.newBuilder()
.setCallId(1)
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
routerStateIdContext.updateResponseState(responseHeaderBuilder);
latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
responseHeaderBuilder.build().getRouterFederatedState());
// Validate that ns0 is still part of the header
Assertions.assertEquals(0, latestFederateState.size());
}
@EnumSource(ConfigSetting.class) @EnumSource(ConfigSetting.class)
@ParameterizedTest @ParameterizedTest
public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception { public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception {