状态管理之状态后端
1. 状态后端(State Backends)
在Flink中,状态的存储、访问以及维护通过状态后端(state backend)进行管理。状态后端主要负责管理本地状态的存储方式和位置。
1.1 状态后端的分类
状态后端是一个"开箱即用"的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端,一种是"哈希表状态后端"(HashMapStateBackend),另一种是"内嵌RocksDB状态后端"(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。
1.2 哈希表状态后端(HashMapStateBackend)
HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆上。普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来。
1.3 内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)
RocksDB是一种嵌入型的key-value数据库,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。RocksDB的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。
EmbeddedRocksDBStateBackend始终执行的是异步快照,所以不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。
1.4 如何选择正确的状态后端
HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里。
HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。
2. 状态后端的配置
flink-conf.yaml中指定的,配置的属性名称为state.backend的配置项:
- 配置项的值为hashmap---启用HashMapStateBackend。
- 配置项的值为rocksdb---启用EmbeddedRocksDBStateBackend。
# 配置HashMapStateBackend的例子
# 默认状态后端
state.backend: hashmap
# 存放检查点的文件路径
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints
也可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件对应的属性值。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置状态后端
env.setStateBackend(new HashMapStateBackend());
// env.setStateBackend(new EmbeddedRocksDBStateBackend());
需要注意的是Flink在生产环境已经包含了RocksDB,如果在使用rocksdb,需要为Flink项目添加依赖并设置provided:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
另外还可以在提交flink作业的命令中指定-Dstate.backend=hashmap
参数来指定状态后端。