HDFS-17306. RBF: Router should not return nameservices that does not enable observer nodes in RpcResponseHeaderProto (#6385)
This commit is contained in:
parent
8c26d4e9e0
commit
7d3b6a36b8
@ -85,7 +85,11 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder)
|
||||
return;
|
||||
}
|
||||
RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
|
||||
namespaceIdMap.forEach((k, v) -> builder.putNamespaceStateIds(k, v.get()));
|
||||
namespaceIdMap.forEach((k, v) -> {
|
||||
if (v.get() != Long.MIN_VALUE) {
|
||||
builder.putNamespaceStateIds(k, v.get());
|
||||
}
|
||||
});
|
||||
headerBuilder.setRouterFederatedState(builder.build().toByteString());
|
||||
}
|
||||
|
||||
@ -97,6 +101,10 @@ public List<String> getNamespaces() {
|
||||
return Collections.list(namespaceIdMap.keys());
|
||||
}
|
||||
|
||||
public ConcurrentHashMap<String, LongAccumulator> getNamespaceIdMap() {
|
||||
return namespaceIdMap;
|
||||
}
|
||||
|
||||
public void removeNamespaceStateId(String nsId) {
|
||||
namespaceIdMap.remove(nsId);
|
||||
}
|
||||
|
@ -30,6 +30,7 @@
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.LongAccumulator;
|
||||
|
||||
@ -586,6 +587,39 @@ public void testClientReceiveResponseState() {
|
||||
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
||||
public void testRouterResponseHeaderState() {
|
||||
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration());
|
||||
|
||||
ConcurrentHashMap<String, LongAccumulator> namespaceIdMap =
|
||||
routerStateIdContext.getNamespaceIdMap();
|
||||
namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10));
|
||||
namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100));
|
||||
namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE));
|
||||
|
||||
Map<String, Long> mockMapping = new HashMap<>();
|
||||
mockMapping.put("ns0", 10L);
|
||||
mockMapping.put("ns2", 100L);
|
||||
mockMapping.put("ns3", Long.MIN_VALUE);
|
||||
RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
|
||||
mockMapping.forEach(builder::putNamespaceStateIds);
|
||||
|
||||
RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder =
|
||||
RpcHeaderProtos.RpcResponseHeaderProto
|
||||
.newBuilder()
|
||||
.setCallId(1)
|
||||
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
|
||||
.setRouterFederatedState(builder.build().toByteString());
|
||||
routerStateIdContext.updateResponseState(responseHeaderBuilder);
|
||||
|
||||
Map<String, Long> latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
|
||||
responseHeaderBuilder.build().getRouterFederatedState());
|
||||
Assertions.assertEquals(2, latestFederateState.size());
|
||||
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
|
||||
Assertions.assertEquals(100L, latestFederateState.get("ns1"));
|
||||
}
|
||||
|
||||
@EnumSource(ConfigSetting.class)
|
||||
@ParameterizedTest
|
||||
public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception {
|
||||
|
Loading…
Reference in New Issue
Block a user