HDFS-17166. RBF: Throwing NoNamenodesAvailableException for a long time, when failover (#5990)

This commit is contained in:
Jian Zhang 2023-09-06 08:48:27 +08:00 committed by GitHub
parent 2831c7ce26
commit c40a6bd46a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 148 additions and 0 deletions

View File

@ -146,4 +146,14 @@ List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
* @param routerId Unique string identifier for the router.
*/
void setRouterId(String routerId);
/**
* Rotate cache, make the current namenode have the lowest priority,
* to ensure that the current namenode will not be accessed first next time.
*
* @param nsId name service id
* @param namenode namenode contexts
* @param listObserversFirst Observer read case, observer NN will be ranked first
*/
void rotateCache(String nsId, FederationNamenodeContext namenode, boolean listObserversFirst);
}

View File

@ -478,4 +478,45 @@ private List<MembershipState> getRecentRegistrationForQuery(
public void setRouterId(String router) {
this.routerId = router;
}
/**
* Rotate cache, make the current namenode have the lowest priority,
* to ensure that the current namenode will not be accessed first next time.
*
* @param nsId name service id
* @param namenode namenode contexts
* @param listObserversFirst Observer read case, observer NN will be ranked first
*/
@Override
public void rotateCache(
String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) {
cacheNS.compute(Pair.of(nsId, listObserversFirst), (ns, namenodeContexts) -> {
if (namenodeContexts == null || namenodeContexts.size() <= 1) {
return namenodeContexts;
}
FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0);
/*
* If the first nn in the cache is active, the active nn priority cannot be lowered.
* This happens when other threads have already updated the cache.
*/
if (firstNamenodeContext.getState().equals(ACTIVE)) {
return namenodeContexts;
}
/*
* If the first nn in the cache at this time is not the nn
* that needs to be lowered in priority, there is no need to rotate.
* This happens when other threads have already rotated the cache.
*/
if (firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) {
List<FederationNamenodeContext> rotatedNnContexts = new ArrayList<>(namenodeContexts);
Collections.rotate(rotatedNnContexts, -1);
String firstNamenodeId = namenodeContexts.get(0).getNamenodeId();
LOG.info("Rotate cache of pair <ns: {}, observer first: {}>, put namenode: {} in the " +
"first position of the cache and namenode: {} in the last position of the cache",
nsId, listObserversFirst, firstNamenodeId, namenode.getNamenodeId());
return rotatedNnContexts;
}
return namenodeContexts;
});
}
}

View File

@ -599,6 +599,8 @@ public Object invokeMethod(
}
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
// Rotate cache so that client can retry the next namenode in the cache
this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver);
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
} else {

View File

@ -1202,4 +1202,13 @@ public void waitClusterUp() throws IOException {
throw new IOException("Cannot wait for the namenodes", e);
}
}
/**
* Get cache flush interval in milliseconds.
*
* @return Cache flush interval in milliseconds.
*/
public long getCacheFlushInterval() {
return cacheFlushInterval;
}
}

View File

@ -397,6 +397,11 @@ public List<String> getMountPoints(String path) throws IOException {
public void setRouterId(String router) {
}
@Override
public void rotateCache(
String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) {
}
/**
* Mocks the availability of default namespace.
* @param b if true default namespace is unset.

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.STANDBY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby;
@ -42,16 +44,19 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
@ -357,6 +362,82 @@ public void testNoNamenodesAvailable() throws Exception{
assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes());
}
/**
* When failover occurs, the router may record that the ns has no active namenode.
* Only when the router updates the cache next time can the memory status be updated,
* causing the router to report NoNamenodesAvailableException for a long time.
*/
@Test
public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception {
setupCluster(false, true);
transitionClusterNSToStandby(cluster);
for (RouterContext routerContext : cluster.getRouters()) {
// Manually trigger the heartbeat
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
.getRouter().getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
// Update service cache
routerContext.getRouter().getStateStore().refreshCaches(true);
}
// Record the time after the router first updated the cache
long firstLoadTime = Time.now();
List<MiniRouterDFSCluster.NamenodeContext> namenodes = cluster.getNamenodes();
// Make sure all namenodes are in standby state
for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) {
assertEquals(STANDBY.ordinal(), namenodeContext.getNamenode().getNameNodeState());
}
Configuration conf = cluster.getRouterClientConf();
// Set dfs.client.failover.random.order false, to pick 1st router at first
conf.setBoolean("dfs.client.failover.random.order", false);
DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);
for (RouterContext routerContext : cluster.getRouters()) {
// Get the second namenode in the router cache and make it active
List<? extends FederationNamenodeContext> ns0 = routerContext.getRouter()
.getNamenodeResolver()
.getNamenodesForNameserviceId("ns0", false);
String nsId = ns0.get(1).getNamenodeId();
cluster.switchToActive("ns0", nsId);
// Manually trigger the heartbeat, but the router does not manually load the cache
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
.getRouter().getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
assertEquals(ACTIVE.ordinal(),
cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState());
}
// Get router0 metrics
FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0)
.getRouter().getRpcServer().getRPCMetrics();
// Original failures
long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();
/*
* At this time, the router has recorded 2 standby namenodes in memory,
* and the first accessed namenode is indeed standby,
* then an NoNamenodesAvailableException will be reported for the first access,
* and the next access will be successful.
*/
routerClient.getFileInfo("/");
long successReadTime = Time.now();
assertEquals(originalRouter0Failures + 1, rpcMetrics0.getProxyOpNoNamenodes());
/*
* access the active namenode without waiting for the router to update the cache,
* even if there are 2 standby states recorded in the router memory.
*/
assertTrue(successReadTime - firstLoadTime < cluster.getCacheFlushInterval());
}
@Test
public void testAsyncCallerPoolMetrics() throws Exception {
setupCluster(true, false);