添加RocksDB状态后端相关

This commit is contained in:
LingZhaoHui 2023-09-17 15:36:14 +08:00
parent 294854f726
commit 045454f380
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC

View File

@ -136,7 +136,6 @@ Flink 为算子状态提供三种基本数据结构:
| state.backend.rocksdb.checkpoint.transfer.thread.num | 4 | 用于上传和下载文件的线程数目 |
| state.backend.rocksdb.write-batch-size | 2mb | Rocksdb写入时消耗的最大内存 |
| state.backend.rocksdb.predefined-options | DEFAULT | `DEFAULT`所有的RocksDb配置都是默认值。 <br>`SPINNING_DISK_OPTIMIZED`:在写硬盘的时候优化RocksDb参数 <br>`SPINNING_DISK_OPTIMIZED_HIGH_MEM`: 在写入常规硬盘时优化参数,需要消耗更多的内存<br> `FLASH_SSD_OPTIMIZED`:在写入ssd闪盘时进行优化。 |
| | | |
@ -182,9 +181,79 @@ private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
## EmbeddedRocksDBStateBackend
将正在于行的作业的状态保存到RocksDb里面。
## 创建KeyedStateBackend
1. 加载`RocksDB JNI library`相关Jar包。
2. 申请RocksDB所需要的内存。核心代码在SharedResources类当中的getOrAllocateSharedResource函数。在申请资源之前会先加锁在加锁成功会申请所需要的资源。加锁代码如下
```java
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MemoryAllocationException("Interrupted while acquiring memory");
}
```
在申请资源之前需要根据类型判断是否已经申请了资源,如果已经申请了资源就不会重新申请,没有则需要申请。申请的代码如下所示:
````java
private static <T extends AutoCloseable> LeasedResource<T> createResource(
LongFunctionWithException<T, Exception> initializer, long size) throws Exception {
final T resource = initializer.apply(size);
return new LeasedResource<>(resource, size);
}
````
3. 创建resourceContainer,包含预先定义好的RocksDB优化选项等。
````java
private RocksDBResourceContainer createOptionsAndResourceContainer(
@Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources,
@Nullable File instanceBasePath,
boolean enableStatistics) {
return new RocksDBResourceContainer(
configurableOptions != null ? configurableOptions : new Configuration(),
predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT,
rocksDbOptionsFactory,
sharedResources,
instanceBasePath,
enableStatistics);
````
4. 初始化RocksDBKeyedStateBackend会从目录里面加载数据到RocksDB里面。
````java
restoreOperation =
getRocksDBRestoreOperation(
keyGroupPrefixBytes,
cancelStreamRegistry,
kvStateInformation,
registeredPQStates,
ttlCompactFiltersManager);
RocksDBRestoreResult restoreResult = restoreOperation.restore();
db = restoreResult.getDb();
defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();
````
restoreOperation实现类图如下所示主要包含如下的实现类。
![pic](https://pan.zeekling.cn//flink/basic/state/StateBackend_0004.png)