From 2f70b52a5bc6d057232a07916c1cc9c0af4ade47 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 5 Sep 2019 08:29:56 +0530 Subject: [PATCH] HDFS-14812. RBF: MountTableRefresherService should load cache when refresh. Contributed by xuzq. --- .../router/MountTableRefresherService.java | 36 ++++++++++++------- .../hdfs/server/federation/router/Router.java | 2 +- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java index fafcef475a..e3ecd266ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; @@ -57,7 +58,7 @@ */ public class MountTableRefresherService extends AbstractService { private static final String ROUTER_CONNECT_ERROR_MSG = - "Router {} connection failed. Mount table cache will not refesh."; + "Router {} connection failed. Mount table cache will not refresh."; private static final Logger LOG = LoggerFactory.getLogger(MountTableRefresherService.class); @@ -66,7 +67,7 @@ public class MountTableRefresherService extends AbstractService { /** Mount table store. */ private MountTableStore mountTableStore; /** Local router admin address in the form of host:port. */ - private String localAdminAdress; + private String localAdminAddress; /** Timeout in ms to update mount table cache on all the routers. */ private long cacheUpdateTimeout; @@ -97,9 +98,9 @@ public MountTableRefresherService(Router router) { protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); this.mountTableStore = getMountTableStore(); - // attach this service to mount table store. + // Attach this service to mount table store. this.mountTableStore.setRefreshService(this); - this.localAdminAdress = + this.localAdminAddress = StateStoreUtils.getHostPortString(router.getAdminServerAddress()); this.cacheUpdateTimeout = conf.getTimeDuration( RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, @@ -198,19 +199,27 @@ private MountTableStore getMountTableStore() throws IOException { * Refresh mount table cache of this router as well as all other routers. */ public void refresh() throws StateStoreUnavailableException { - List cachedRecords = - router.getRouterStateManager().getCachedRecords(); + RouterStore routerStore = router.getRouterStateManager(); + + try { + routerStore.loadCache(true); + } catch (IOException e) { + LOG.warn("RouterStore load cache failed,", e); + } + + List cachedRecords = routerStore.getCachedRecords(); List refreshThreads = new ArrayList<>(); for (RouterState routerState : cachedRecords) { String adminAddress = routerState.getAdminAddress(); if (adminAddress == null || adminAddress.length() == 0) { - // this router has not enabled router admin + // this router has not enabled router admin. continue; } // No use of calling refresh on router which is not running state if (routerState.getStatus() != RouterServiceState.RUNNING) { LOG.info( - "Router {} is not running. Mount table cache will not refesh."); + "Router {} is not running. Mount table cache will not refresh.", + routerState.getAddress()); // remove if RouterClient is cached. removeFromCache(adminAddress); } else if (isLocalAdmin(adminAddress)) { @@ -268,22 +277,23 @@ private void invokeRefresh(List refreshThreads) { } private boolean isLocalAdmin(String adminAddress) { - return adminAddress.contentEquals(localAdminAdress); + return adminAddress.contentEquals(localAdminAddress); } private void logResult(List refreshThreads) { - int succesCount = 0; + int successCount = 0; int failureCount = 0; for (MountTableRefresherThread mountTableRefreshThread : refreshThreads) { if (mountTableRefreshThread.isSuccess()) { - succesCount++; + successCount++; } else { failureCount++; // remove RouterClient from cache so that new client is created removeFromCache(mountTableRefreshThread.getAdminAddress()); } } - LOG.info("Mount table entries cache refresh succesCount={},failureCount={}", - succesCount, failureCount); + LOG.info( + "Mount table entries cache refresh successCount={},failureCount={}", + successCount, failureCount); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index b6d188d21c..a03d8d4960 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -285,7 +285,7 @@ protected void serviceInit(Configuration configuration) throws Exception { MountTableRefresherService.class.getSimpleName()); } else { LOG.warn( - "Service {} not enabled: depenendent service(s) {} not enabled.", + "Service {} not enabled: dependent service(s) {} not enabled.", MountTableRefresherService.class.getSimpleName(), disabledDependentServices); }