From bb49a67830cd77a5f4662823b726572a0244eb65 Mon Sep 17 00:00:00 2001 From: zeekling Date: Thu, 10 Oct 2024 00:00:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Router=E8=AF=A6=E8=A7=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hdfs/router启动详解.md | 96 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 85 insertions(+), 11 deletions(-) diff --git a/hdfs/router启动详解.md b/hdfs/router启动详解.md index 00ffbad..d2f4b19 100644 --- a/hdfs/router启动详解.md +++ b/hdfs/router启动详解.md @@ -63,33 +63,107 @@ addService(this.cacheUpdater); 启动主要是Router的serviceStart函数触发,最终调用StateStoreDriver的init函数,用于初始化driver。核心函数为initDriver和initRecordStorage。 -#### StateStoreFileImpl +其中initRecordStorage针对每个record stores都需要调用,如下: +```java +for (Class cls : records) { + String recordString = StateStoreUtils.getRecordName(cls); + if (!initRecordStorage(recordString, cls)) { + LOG.error("Cannot initialize record store for {}", cls.getSimpleName()); + return false; + } +} +``` + +#### StateStoreFileImpl or StateStoreFileSystemImpl ##### initDriver +对于当前的StateStore初始化比较简单,主要是检查本地文件夹是否存在,不存在就创建。大致代码如下: + +```java +public boolean initDriver() { + String rootDir = getRootDir(); + if (rootDir == null) { + LOG.error("Invalid root directory, unable to initialize driver."); + return false; + } + + // Check root path + if (!exists(rootDir)) { + if (!mkdir(rootDir)) { + LOG.error("Cannot create State Store root directory {}", rootDir); + return false; + } + } + // ... 省略 ... + int threads = getConcurrentFilesAccessNumThreads(); + this.concurrentStoreAccessPool = + new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder() + .setNameFormat("state-store-file-based-concurrent-%d") + .setDaemon(true).build()); + return true; + } +``` ##### initRecordStorage +和initDriver类似,支持针对每个State Store创建对应的目录,目录名称使用state store的className。 - -#### StateStoreFileSystemImpl - -initDriver - -initRecordStorage +```java +public boolean initRecordStorage( + String className, Class recordClass) { + String dataDirPath = getRootDir() + "/" + className; + // Create data directories for files + if (!exists(dataDirPath)) { + LOG.info("{} data directory doesn't exist, creating it", dataDirPath); + if (!mkdir(dataDirPath)) { + LOG.error("Cannot create data directory {}", dataDirPath); + return false; + } + } + return true; + } +``` #### StateStoreMySQLImpl -initDriver +##### initDriver +核心逻辑就是创建Mysql连接。Mysql连接封装为类MySQLStateStoreHikariDataSourceConnectionFactory。 +```java +MySQLStateStoreHikariDataSourceConnectionFactory(Configuration conf) { + Properties properties = new Properties(); + properties.setProperty("jdbcUrl", conf.get(StateStoreMySQLImpl.CONNECTION_URL)); + properties.setProperty("username", conf.get(StateStoreMySQLImpl.CONNECTION_USERNAME)); + properties.setProperty("password", conf.get(StateStoreMySQLImpl.CONNECTION_PASSWORD)); + properties.setProperty("driverClassName", conf.get(StateStoreMySQLImpl.CONNECTION_DRIVER)); -initRecordStorage + // Include hikari connection properties + properties.putAll(conf.getPropsWithPrefix(HIKARI_PROPS)); + + HikariConfig hikariConfig = new HikariConfig(properties); + this.dataSource = new HikariDataSource(hikariConfig); +} +``` + +##### initRecordStorage + +zai StateStoreMySQLImpl当中,每个state store 对应一张表。建表语句如下: +```sql +CREATE TABLE ( + recordKey VARCHAR (255) NOT NULL, + recordValue VARCHAR (2047) NOT NULL, + PRIMARY KEY(recordKey)) +) +``` #### StateStoreZooKeeperImpl -initDriver +##### initDriver -initRecordStorage +##### initRecordStorage ## ActiveNamenodeResolver