Flink状态后端:HashMap与RocksDB 📅 2026/6/24 8:17:43 一、前言在Flink中状态的存储、访问以及维护都是由一个可插拔的组件决定的这个组件就叫作状态后端State Backend。状态后端主要负责管理本地状态的存储方式和位置是Flink容错机制的核心支撑。理解状态后端的选择与配置对于保障Flink作业的性能、稳定性和可扩展性至关重要。本文将从原理到实战全面剖析Flink的两种状态后端。二、状态后端概述2.1 什么是状态后端状态后端State Backend是一个开箱即用的组件可以在不改变应用程序逻辑的情况下独立配置。它决定了状态数据存储在哪里内存/磁盘/远程存储状态如何被序列化和反序列化Checkpoint时状态如何被持久化如上图所示状态后端分为本地状态存储和持久化快照两部分。用户代码通过本地状态后端进行读写操作而Checkpoint时将状态持久化到分布式文件系统DFS中。2.2 状态后端的分类Flink提供了两类不同的状态后端状态后端存储位置特点HashMapStateBackendJVM堆内存内存计算读写速度快受内存限制EmbeddedRocksDBStateBackend本地磁盘RocksDB可存储海量状态支持增量Checkpoint如果没有特别配置系统默认的状态后端是HashMapStateBackend。三、HashMapStateBackend详解3.1 原理与架构HashMapStateBackend是把状态存放在内存里。具体实现上它在内部会直接把状态当作对象objects保存在TaskManager的JVM堆上。存储结构普通的状态以及窗口中收集的数据和触发器都会以键值对的形式存储起来底层是一个哈希表HashMap因此得名HashMapStateBackend状态数据以Java对象的形式直接存储在内存中3.2 特点与适用场景优点读写速度极快状态以对象形式存储在内存中无需序列化/反序列化开销低延迟内存访问速度远快于磁盘I/O实现简单直接利用JVM堆内存管理缺点受内存限制状态大小不能超过TaskManager可用内存大状态容易OOM如果状态持续增长可能导致内存溢出Checkpoint时全量快照每次Checkpoint都需要将全部状态数据写入外部存储适用场景状态量较小GB级别以下对延迟要求极高的场景状态大小相对稳定不会持续增长四、EmbeddedRocksDBStateBackend详解4.1 原理与架构RocksDB是一种内嵌的key-value存储介质可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后会将处理中的数据全部放入RocksDB数据库中。如上图所示RocksDB状态后端的工作机制TaskManagerJVM进程内运行RocksDBnative线程状态数据存储在Local Disks本地磁盘上Checkpoint时将状态异步上传到Remote Durable Storage如HDFS、S34.2 特点与适用场景优点存储容量大可根据可用磁盘空间进行扩展适合超级海量状态异步快照不会因为保存Checkpoint而阻塞数据处理增量Checkpoint只保存自上次Checkpoint以来的变化大幅提升效率状态数据序列化存储状态被存储为序列化的字节数组缺点读写性能较低每次访问需要序列化/反序列化性能比HashMap慢一个数量级key按字节比较key的比较按照字节进行而不是直接调用hashCode()和equals()依赖本地磁盘需要足够的本地磁盘空间适用场景状态量极大TB级别状态持续增长需要持久化存储对Checkpoint效率有要求需要增量备份4.3 RocksDB的存储结构RocksDB使用LSM-TreeLog-Structured Merge-Tree结构存储数据MemTable内存中的写入缓冲区SST Files磁盘上的有序字符串表文件Block Cache缓存热点数据加速读取五、两种状态后端对比5.1 核心对比表对比维度HashMapStateBackendEmbeddedRocksDBStateBackend存储位置JVM堆内存本地磁盘RocksDB存储容量受内存限制受磁盘空间限制读写性能极快无序列化开销较慢需序列化/反序列化Checkpoint方式同步/异步快照始终异步快照增量Checkpoint不支持支持状态对象存储以Java对象存储以序列化字节数组存储key比较方式调用hashCode()和equals()按字节比较适用状态大小小状态GB级以下大状态TB级延迟要求低延迟场景可接受一定延迟默认配置是否5.2 如何选择正确的状态后端HashMap和RocksDB两种状态后端最大的区别就在于本地状态存放在哪里。选择HashMapStateBackend状态大小较小可以放入内存对处理延迟要求极高状态大小相对稳定不会持续增长选择EmbeddedRocksDBStateBackend状态量极大需要磁盘存储状态持续增长需要持久化需要增量Checkpoint提升效率可以接受一定的性能损耗六、状态后端的配置方式6.1 配置文件配置全局配置在flink-conf.yaml中可以使用state.backend配置默认状态后端。# 默认状态后端state.backend:hashmap# 存放检查点的文件路径state.checkpoints.dir:hdfs://hadoop102:8020/flink/checkpoints配置项的可能值hashmap配置HashMapStateBackendrocksdb配置EmbeddedRocksDBStateBackend配置示例# 配置HashMapStateBackendstate.backend:hashmapstate.checkpoints.dir:hdfs://hadoop102:8020/flink/checkpoints# 配置EmbeddedRocksDBStateBackendstate.backend:rocksdbstate.checkpoints.dir:hdfs://hadoop102:8020/flink/checkpoints# RocksDB增量Checkpoint配置可选state.backend.incremental:true6.2 代码中配置单作业配置通过执行环境设置可以为每个作业单独配置状态后端覆盖集群配置文件的默认值。配置HashMapStateBackendStreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();// 设置HashMapStateBackendenv.setStateBackend(newHashMapStateBackend());配置EmbeddedRocksDBStateBackendStreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();// 设置EmbeddedRocksDBStateBackendenv.setStateBackend(newEmbeddedRocksDBStateBackend());注意事项如果想在IDE中使用EmbeddedRocksDBStateBackend需要为Flink项目添加依赖dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version/dependency由于Flink发行版中默认就包含了RocksDB服务器上解压的Flink所以只要代码中没有使用RocksDB的相关内容就不需要引入这个依赖。6.3 配置方式优先级配置方式优先级作用范围代码中配置最高当前作业提交参数-D中当前作业flink-conf.yaml最低集群所有作业七、RocksDB增量Checkpoint7.1 什么是增量Checkpoint在Flink 1.15之前只有RocksDB支持增量快照。不同于产生一个包含所有数据的全量备份增量快照中只包含自上一次快照完成之后被修改的记录因此可以显著减少快照完成的耗时。7.2 增量Checkpoint的执行过程增量Checkpoint的执行过程分为以下几个阶段带状态的算子任务将状态更改写入变更日志记录状态状态物化状态表定期保存独立于Checkpoint状态物化完成后状态变更日志可以被截断到相应的点7.3 增量Checkpoint的配置在flink-conf.yaml中配置state.backend:rocksdbstate.backend.incremental:true在代码中配置// 启用RocksDB增量CheckpointEmbeddedRocksDBStateBackendbackendnewEmbeddedRocksDBStateBackend(true);env.setStateBackend(backend);7.4 通用增量CheckpointChangelog从Flink 1.15开始不管HashMap还是RocksDB状态后端都可以通过开启changelog实现通用的增量Checkpoint。配置方式一配置文件指定state.backend.changelog.enabled:truestate.backend.changelog.storage:filesystem# 存储changelog数据dstl.dfs.base-path:hdfs://hadoop102:8020/changelogexecution.checkpointing.max-concurrent-checkpoints:1execution.savepoint-restore-mode:CLAIM配置方式二代码中设置需要引入依赖dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-changelog/artifactIdversion${flink.version}/versionscoperuntime/scope/dependency开启changelogenv.enableChangelogStateBackend(true);注意事项目前标记为实验性功能Checkpoint的最大并发必须为1从Flink 1.15开始只有文件系统的存储类型实现可用不支持NO_CLAIM模式八、状态后端切换实战8.1 使用Savepoint切换状态后端使用Savepoint恢复状态时可以更换状态后端。但需要注意不要在代码中硬编码状态后端而是通过配置文件或-D参数配置。步骤一提交作业使用HashMapStateBackendbin/flink run-application-d-tyarn-application-Dstate.backendhashmap-ccom.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar步骤二停止作业时触发Savepoint# 优雅停止并触发Savepointbin/flink stop-psavepoint路径 job-id-yidapplication-id# 或立即停止并触发Savepointbin/flink cancel-ssavepoint路径 job-id-yidapplication-id步骤三从Savepoint恢复同时修改状态后端bin/flink run-application-d-tyarn-application-shdfs://hadoop102:8020/sp/savepoint-xxx-Dstate.backendrocksdb-ccom.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar8.2 从Checkpoint恢复作业bin/flink run-application-d-tyarn-application-Dstate.backendrocksdb-shdfs://hadoop102:8020/chk/xxx/chk-xxx-ccom.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar注意从Checkpoint恢复时不支持切换状态后端。总结本文详细讲解了Flink中的两种核心状态后端HashMapStateBackend将状态存储在JVM堆内存中读写速度快适合小状态、低延迟场景EmbeddedRocksDBStateBackend将状态存储在本地RocksDB中容量大、支持增量Checkpoint适合大状态场景配置方式支持配置文件全局配置和代码单作业配置代码配置优先级最高RocksDB增量Checkpoint只备份变化数据显著提升Checkpoint效率从Flink 1.15开始支持通用增量Checkpointchangelog状态后端切换可以通过Savepoint在恢复时切换状态后端但Checkpoint恢复不支持切换选择合适的状态后端是保障Flink作业性能和稳定性的关键决策。在实际项目中应根据状态大小、增长趋势、延迟要求等因素综合考量。如果本文对你有帮助欢迎点赞 收藏 ⭐ 关注 你的支持是我持续创作的动力