diff --git a/basic/checkpoint.md b/basic/checkpoint.md index 2904e02..f24867e 100644 --- a/basic/checkpoint.md +++ b/basic/checkpoint.md @@ -136,7 +136,6 @@ Flink 为算子状态提供三种基本数据结构: | state.backend.rocksdb.checkpoint.transfer.thread.num | 4 | 用于上传和下载文件的线程数目 | | state.backend.rocksdb.write-batch-size | 2mb | Rocksdb写入时消耗的最大内存 | | state.backend.rocksdb.predefined-options | DEFAULT | `DEFAULT`:所有的RocksDb配置都是默认值。
`SPINNING_DISK_OPTIMIZED`:在写硬盘的时候优化RocksDb参数
`SPINNING_DISK_OPTIMIZED_HIGH_MEM`: 在写入常规硬盘时优化参数,需要消耗更多的内存
`FLASH_SSD_OPTIMIZED`:在写入ssd闪盘时进行优化。 | -| | | | @@ -182,9 +181,79 @@ private final HashMap> keyValueStatesByName; ## EmbeddedRocksDBStateBackend +将正在于行的作业的状态保存到RocksDb里面。 +## 创建KeyedStateBackend + + + +1. 加载`RocksDB JNI library`相关Jar包。 + +2. 申请RocksDB所需要的内存。核心代码在SharedResources类当中的getOrAllocateSharedResource函数。在申请资源之前会先加锁,在加锁成功会申请所需要的资源。加锁代码如下: + + + +```java +try { + lock.lockInterruptibly(); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MemoryAllocationException("Interrupted while acquiring memory"); +} +``` + + + +在申请资源之前需要根据类型判断是否已经申请了资源,如果已经申请了资源就不会重新申请,没有则需要申请。申请的代码如下所示: + +````java +private static LeasedResource createResource( + LongFunctionWithException initializer, long size) throws Exception { + + final T resource = initializer.apply(size); + return new LeasedResource<>(resource, size); +} +```` + +3. 创建resourceContainer,包含预先定义好的RocksDB优化选项等。 + +````java +private RocksDBResourceContainer createOptionsAndResourceContainer( + @Nullable OpaqueMemoryResource sharedResources, + @Nullable File instanceBasePath, + boolean enableStatistics) { + +return new RocksDBResourceContainer( + configurableOptions != null ? configurableOptions : new Configuration(), + predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT, + rocksDbOptionsFactory, + sharedResources, + instanceBasePath, + enableStatistics); +```` + + + +4. 初始化RocksDBKeyedStateBackend,会从目录里面加载数据到RocksDB里面。 + +````java +restoreOperation = + getRocksDBRestoreOperation( + keyGroupPrefixBytes, + cancelStreamRegistry, + kvStateInformation, + registeredPQStates, + ttlCompactFiltersManager); +RocksDBRestoreResult restoreResult = restoreOperation.restore(); +db = restoreResult.getDb(); +defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle(); +```` + +restoreOperation实现类图如下所示,主要包含如下的实现类。 + +![pic](https://pan.zeekling.cn//flink/basic/state/StateBackend_0004.png)