HDFS-17324. RBF: Router should not return nameservices that not enable observer r… (#6412)
This commit is contained in:
parent
b2fac14828
commit
8193a84020
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.federation.router;
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
|
||||||
@ -58,6 +59,10 @@ class RouterStateIdContext implements AlignmentContext {
|
|||||||
private final ConcurrentHashMap<String, LongAccumulator> namespaceIdMap;
|
private final ConcurrentHashMap<String, LongAccumulator> namespaceIdMap;
|
||||||
// Size limit for the map of state Ids to send to clients.
|
// Size limit for the map of state Ids to send to clients.
|
||||||
private final int maxSizeOfFederatedStateToPropagate;
|
private final int maxSizeOfFederatedStateToPropagate;
|
||||||
|
/** Observer read enabled. Default for all nameservices. */
|
||||||
|
private final boolean observerReadEnabledDefault;
|
||||||
|
/** Nameservice specific overrides of the default setting for enabling observer reads. */
|
||||||
|
private HashSet<String> observerReadEnabledOverrides = new HashSet<>();
|
||||||
|
|
||||||
RouterStateIdContext(Configuration conf) {
|
RouterStateIdContext(Configuration conf) {
|
||||||
this.coordinatedMethods = new HashSet<>();
|
this.coordinatedMethods = new HashSet<>();
|
||||||
@ -75,6 +80,15 @@ class RouterStateIdContext implements AlignmentContext {
|
|||||||
maxSizeOfFederatedStateToPropagate =
|
maxSizeOfFederatedStateToPropagate =
|
||||||
conf.getInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE,
|
conf.getInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE,
|
||||||
RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT);
|
RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT);
|
||||||
|
|
||||||
|
this.observerReadEnabledDefault = conf.getBoolean(
|
||||||
|
RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY,
|
||||||
|
RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE);
|
||||||
|
String[] observerReadOverrides =
|
||||||
|
conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
|
||||||
|
if (observerReadOverrides != null) {
|
||||||
|
observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -86,7 +100,7 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder)
|
|||||||
}
|
}
|
||||||
RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
|
RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
|
||||||
namespaceIdMap.forEach((k, v) -> {
|
namespaceIdMap.forEach((k, v) -> {
|
||||||
if (v.get() != Long.MIN_VALUE) {
|
if ((v.get() != Long.MIN_VALUE) && isNamespaceObserverReadEligible(k)) {
|
||||||
builder.putNamespaceStateIds(k, v.get());
|
builder.putNamespaceStateIds(k, v.get());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -177,4 +191,13 @@ public boolean isCoordinatedCall(String protocolName, String methodName) {
|
|||||||
return protocolName.equals(ClientProtocol.class.getCanonicalName())
|
return protocolName.equals(ClientProtocol.class.getCanonicalName())
|
||||||
&& coordinatedMethods.contains(methodName);
|
&& coordinatedMethods.contains(methodName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a namespace is eligible for observer reads.
|
||||||
|
* @param nsId namespaceID
|
||||||
|
* @return whether the 'namespace' has observer reads enabled.
|
||||||
|
*/
|
||||||
|
boolean isNamespaceObserverReadEligible(String nsId) {
|
||||||
|
return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -590,7 +590,12 @@ public void testClientReceiveResponseState() {
|
|||||||
@Test
|
@Test
|
||||||
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
|
||||||
public void testRouterResponseHeaderState() {
|
public void testRouterResponseHeaderState() {
|
||||||
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration());
|
// This conf makes ns1 that is not eligible for observer reads.
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
|
||||||
|
conf.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns1");
|
||||||
|
|
||||||
|
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf);
|
||||||
|
|
||||||
ConcurrentHashMap<String, LongAccumulator> namespaceIdMap =
|
ConcurrentHashMap<String, LongAccumulator> namespaceIdMap =
|
||||||
routerStateIdContext.getNamespaceIdMap();
|
routerStateIdContext.getNamespaceIdMap();
|
||||||
@ -598,26 +603,18 @@ public void testRouterResponseHeaderState() {
|
|||||||
namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100));
|
namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100));
|
||||||
namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE));
|
namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE));
|
||||||
|
|
||||||
Map<String, Long> mockMapping = new HashMap<>();
|
|
||||||
mockMapping.put("ns0", 10L);
|
|
||||||
mockMapping.put("ns2", 100L);
|
|
||||||
mockMapping.put("ns3", Long.MIN_VALUE);
|
|
||||||
RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
|
|
||||||
mockMapping.forEach(builder::putNamespaceStateIds);
|
|
||||||
|
|
||||||
RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder =
|
RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder =
|
||||||
RpcHeaderProtos.RpcResponseHeaderProto
|
RpcHeaderProtos.RpcResponseHeaderProto
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
.setCallId(1)
|
.setCallId(1)
|
||||||
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
|
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
|
||||||
.setRouterFederatedState(builder.build().toByteString());
|
|
||||||
routerStateIdContext.updateResponseState(responseHeaderBuilder);
|
routerStateIdContext.updateResponseState(responseHeaderBuilder);
|
||||||
|
|
||||||
Map<String, Long> latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
|
Map<String, Long> latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
|
||||||
responseHeaderBuilder.build().getRouterFederatedState());
|
responseHeaderBuilder.build().getRouterFederatedState());
|
||||||
Assertions.assertEquals(2, latestFederateState.size());
|
// Only ns0 will be in latestFederateState
|
||||||
|
Assertions.assertEquals(1, latestFederateState.size());
|
||||||
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
|
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
|
||||||
Assertions.assertEquals(100L, latestFederateState.get("ns1"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@EnumSource(ConfigSetting.class)
|
@EnumSource(ConfigSetting.class)
|
||||||
|
Loading…
Reference in New Issue
Block a user