HDFS-16890: RBF: Ensures router periodically refreshes its record of a namespace's state. (#5298)

This commit is contained in:
Simbarashe Dzinamarira 2023-02-27 09:56:24 -08:00 committed by GitHub
parent 8798b94ee1
commit 61f369c43e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 7 deletions

View File

@ -201,6 +201,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize"; FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize";
public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5; public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5;
public static final String DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY =
FEDERATION_ROUTER_PREFIX + "observer.state.id.refresh.period";
public static final String DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_DEFAULT = "15s";
public static final String FEDERATION_STORE_SERIALIZER_CLASS = public static final String FEDERATION_STORE_SERIALIZER_CLASS =
FEDERATION_STORE_PREFIX + "serializer"; FEDERATION_STORE_PREFIX + "serializer";
public static final Class<StateStoreSerializerPBImpl> public static final Class<StateStoreSerializerPBImpl>

View File

@ -57,6 +57,7 @@
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -86,6 +87,7 @@
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.eclipse.jetty.util.ajax.JSON; import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -136,6 +138,14 @@ public class RouterRpcClient {
private final boolean observerReadEnabledDefault; private final boolean observerReadEnabledDefault;
/** Nameservice specific overrides of the default setting for enabling observer reads. */ /** Nameservice specific overrides of the default setting for enabling observer reads. */
private HashSet<String> observerReadEnabledOverrides = new HashSet<>(); private HashSet<String> observerReadEnabledOverrides = new HashSet<>();
/**
* Period to refresh namespace stateID using active namenode.
* This ensures the namespace stateID is fresh even when an
* observer is trailing behind.
*/
private long activeNNStateIdRefreshPeriodMs;
/** Last msync times for each namespace. */
private final ConcurrentHashMap<String, LongAccumulator> lastActiveNNRefreshTimes;
/** Pattern to parse a stack trace line. */ /** Pattern to parse a stack trace line. */
private static final Pattern STACK_TRACE_PATTERN = private static final Pattern STACK_TRACE_PATTERN =
@ -211,13 +221,25 @@ public RouterRpcClient(Configuration conf, Router router,
this.observerReadEnabledDefault = conf.getBoolean( this.observerReadEnabledDefault = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY,
RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE); RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE);
String[] observerReadOverrides = conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES); String[] observerReadOverrides =
conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
if (observerReadOverrides != null) { if (observerReadOverrides != null) {
observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides)); observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides));
} }
if (this.observerReadEnabledDefault) { if (this.observerReadEnabledDefault) {
LOG.info("Observer read is enabled for router."); LOG.info("Observer read is enabled for router.");
} }
this.activeNNStateIdRefreshPeriodMs = conf.getTimeDuration(
RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY,
RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_DEFAULT,
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
if (activeNNStateIdRefreshPeriodMs < 0) {
LOG.info("Periodic stateId freshness check is disabled"
+ " since '{}' is {}ms, which is less than 0.",
RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY,
activeNNStateIdRefreshPeriodMs);
}
this.lastActiveNNRefreshTimes = new ConcurrentHashMap<>();
} }
/** /**
@ -1707,10 +1729,13 @@ private List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsI
boolean isObserverRead) throws IOException { boolean isObserverRead) throws IOException {
final List<? extends FederationNamenodeContext> namenodes; final List<? extends FederationNamenodeContext> namenodes;
if (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE) { boolean listObserverNamenodesFirst = isObserverRead
namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, isObserverRead); && isNamespaceStateIdFresh(nsId)
} else { && (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE);
namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, false); namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, listObserverNamenodesFirst);
if (!listObserverNamenodesFirst) {
// Refresh time of last call to active NameNode.
getTimeOfLastCallToActive(nsId).accumulate(Time.monotonicNow());
} }
if (namenodes == null || namenodes.isEmpty()) { if (namenodes == null || namenodes.isEmpty()) {
@ -1721,7 +1746,8 @@ private List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsI
} }
private boolean isObserverReadEligible(String nsId, Method method) { private boolean isObserverReadEligible(String nsId, Method method) {
boolean isReadEnabledForNamespace = observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId); boolean isReadEnabledForNamespace =
observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
return isReadEnabledForNamespace && isReadCall(method); return isReadEnabledForNamespace && isReadCall(method);
} }
@ -1735,4 +1761,24 @@ private static boolean isReadCall(Method method) {
} }
return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly(); return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
} }
/**
* Checks and sets last refresh time for a namespace's stateId.
* Returns true if refresh time is newer than threshold.
* Otherwise, return false and call should be handled by active namenode.
* @param nsId namespaceID
*/
@VisibleForTesting
boolean isNamespaceStateIdFresh(String nsId) {
if (activeNNStateIdRefreshPeriodMs < 0) {
return true;
}
long timeSinceRefreshMs = Time.monotonicNow() - getTimeOfLastCallToActive(nsId).get();
return (timeSinceRefreshMs <= activeNNStateIdRefreshPeriodMs);
}
private LongAccumulator getTimeOfLastCallToActive(String namespaceId) {
return lastActiveNNRefreshTimes
.computeIfAbsent(namespaceId, key -> new LongAccumulator(Math::max, 0));
}
} }

View File

@ -884,4 +884,14 @@
of namespaces in use and the latency of the msync requests. of namespaces in use and the latency of the msync requests.
</description> </description>
</property> </property>
<property>
<name>dfs.federation.router.observer.state.id.refresh.period</name>
<value>15s</value>
<description>
Period to refresh namespace stateID using active namenode. This ensures the
namespace stateID is refresh even when an observer is trailing behind.
If this is below 0, the auto-refresh is disabled.
</description>
</property>
</configuration> </configuration>

View File

@ -34,9 +34,11 @@
import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
@ -50,6 +52,7 @@
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -95,7 +98,9 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
if (confOverrides != null) { if (confOverrides != null) {
conf.addResource(confOverrides); confOverrides
.iterator()
.forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue()));
} }
cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode); cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode);
cluster.addNamenodeOverrides(conf); cluster.addNamenodeOverrides(conf);
@ -639,4 +644,42 @@ public void testRouterStateIdContextCleanup() throws Exception {
assertEquals("ns0", namespace1.get(0)); assertEquals("ns0", namespace1.get(0));
assertTrue(namespace2.isEmpty()); assertTrue(namespace2.isEmpty());
} }
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testPeriodicStateRefreshUsingActiveNamenode() throws Exception {
Path rootPath = new Path("/");
Configuration confOverride = new Configuration(false);
confOverride.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY, "500ms");
confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s");
startUpCluster(1, confOverride);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
fileSystem.listStatus(rootPath);
int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length;
DFSClient activeClient = cluster.getNamenodes("ns0")
.stream()
.filter(nnContext -> nnContext.getNamenode().isActiveState())
.findFirst().orElseThrow(() -> new IllegalStateException("No active namenode."))
.getClient();
for (int i = 0; i < 10; i++) {
activeClient.mkdirs("/dir" + i, null, false);
}
activeClient.close();
// Wait long enough for state in router to be considered stale.
GenericTestUtils.waitFor(
() -> !routerContext
.getRouterRpcClient()
.isNamespaceStateIdFresh("ns0"),
100,
10000,
"Timeout: Namespace state was never considered stale.");
FileStatus[] rootFolderAfterMkdir = fileSystem.listStatus(rootPath);
assertEquals("List-status should show newly created directories.",
initialLengthOfRootListing + 10, rootFolderAfterMkdir.length);
}
} }