为什么有了 RocketMQ 事务消息,我们还要自研本地消息表方案?

📅 2026/6/20 14:41:17
为什么有了 RocketMQ 事务消息,我们还要自研本地消息表方案?
为什么有了 RocketMQ 事务消息,我们还要自研本地消息表方案?前言最近在 Code Review 一个项目时,发现团队自研了一套完整的事务消息框架 4 张数据库表、定时补偿任务、分布式锁、衰减重试机制…一应俱全。但项目依赖里明明引入了rocketmq-spring-boot-starter:2.3.1,RocketMQ 原生支持事务消息,为什么不直接用?答案很有意思:RocketMQ 只被用作简单的实时消息推送通道,事务消息功能完全没用。这引发了一个值得深思的问题:RocketMQ 事务消息已经很成熟了,为什么很多团队仍然选择自研本地消息表方案?一、先搞清楚:我们要解决的是什么问题?在分布式系统中,经常遇到这样的场景:用户支付成功后,需要修改订单状态(写 MySQL),同时发消息给积分系统加积分(发 MQ)。核心问题:数据库操作和 MQ 投递是两个独立的操作,无法放在同一个原子事务中。强一致性:需要 2PC(XA 协议)或 Raft,性能极差,不适合高并发场景最终一致性:BASE 理论,允许中间状态不一致,但保证最终一致RocketMQ 事务消息和本地消息表,本质上都是实现最终一致性的方案。二、RocketMQ 事务消息的理想与现实理想的实现方式RocketMQ 事务消息的设计非常优雅:半消息 → 执行本地事务 → Commit/Rollback → 回查兜底很多人会这样实现:RocketMQTransactionListenerpublicclassOrderTransactionListenerimplementsRocketMQLocalTransactionListener{OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){// 执行本地事务orderService.updateOrderStatus((OrderParam)arg);returnRocketMQLocalTransactionState.COMMIT;}OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringorderIdmsg.getHeaders().get(orderId).toString();// 直接查业务表OrderorderorderMapper.selectById(orderId);returnorder!null?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;}}看起来很完美,但生产环境会遇到三个关键问题。问题一:回查时的竞态条件场景复现:发半消息成功执行本地事务:数据库死锁,事务卡了 5 秒还没提交MQ 回查:Broker 发现消息未确认,发起回查误判:查订单表 → 查不到(事务还没 Commit)结果:返回 ROLLBACK,MQ 撤销消息悲剧:下一秒本地事务提交成功,订单入库了,但消息没了核心问题:回查时无法区分事务还在执行中和事务执行失败。问题二:业务表旧数据的干扰更隐蔽的问题是:如果业务表里已经有旧数据(比如同一个订单 ID 之前被取消过),回查时会误判。时间线: T1: 用户下单 ORDER_123,事务成功 T2: 用户取消订单,业务表状态改为已取消 T3: 用户重新下单 ORDER_123(同一订单ID) T4: 本次事务失败(回滚) T5: MQ 回查 → 查业务表 → 查到 ORDER_123(旧数据)→ COMMIT(误判!)核心问题:业务表存储的是数据的最终状态,可能被多次事务修改,回查时无法判断这是本次事务写入的,还是之前遗留的旧数据。问题三:消费端的幂等陷阱消费端的幂等检查和业务执行必须在同一个事务里,否则会出现部分成功的情况:TransactionalpublicvoidonMessage(Stringmessage){OrderMsgmsgJSON.parseObject(message,OrderMsg.class);// 错误幂等检查和业务执行分离if(idempotentMapper.exists(msg.getOrderId())){return;// 已消费}// 执行业务inventoryService.addPoints(msg.getUserId(),msg.getPoints());// 如果这里失败业务执行了但幂等记录没插进去idempotentMapper.insert(msg.getOrderId());}三、重要澄清不一定需要事务表上面的分析可能会让人误解“RocketMQ 事务消息必须要有事务表才能可靠工作。”实际上不是这样。要区分三个层次方案1RocketMQ 官方推荐事务消息最常见很多项目根本不会单独建事务表。OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringorderIdmsg.getHeaders().get(orderId).toString();OrderorderorderMapper.selectById(orderId);if(order!null){returnCOMMIT;}returnROLLBACK;}直接查业务表。例如订单创建成功 → 订单表有数据 → COMMIT 订单创建失败 → 订单表没数据 → ROLLBACK这种写法非常普遍。为什么很多项目敢这么干因为业务主键通常不会复用。例如订单ID: 1001 1002 1003 ...永远唯一。不会出现文章举的ORDER_123 删除 重新创建 还是ORDER_123这种情况。所以查订单表 查事务状态成立。什么时候需要事务表当你发现业务表不能准确反映事务状态时。例如场景1业务记录可能被删除创建订单 ↓ 消息未确认 ↓ 订单被删除 ↓ Broker回查 ↓ 查不到订单这时就有问题。场景2一个业务对象会被反复修改例如用户积分用户表id1 score100你无法通过select*fromuserwhereid1判断本次加10分事务到底成功没有此时必须有t_transaction_log场景3审计要求极高金融系统支付 退款 结算通常都会留transaction_log用于追踪。大厂实际情况一般分两类简单业务订单 商品 用户注册直接查业务表。不建事务表。核心交易链路支付 资金 库存扣减会建transaction_log或者outbox_message表。你可以这么理解RocketMQ 事务消息最核心要求只有一个Broker回查时必须能知道本地事务最终状态至于状态存哪里可以是业务表也可以是事务表RocketMQ 根本不关心。所以事务消息 ≠ 必须有事务表而是事务消息 必须有一个可靠的事务状态来源这个来源可以是业务表也可以是事务日志表。很多互联网业务订单、注册、发帖直接查业务表就够了只有复杂交易系统才会额外维护事务表。四、RocketMQ 事务消息的修复方案复杂场景如果你遇到了上面提到的复杂场景业务记录可删除、反复修改、审计要求高就需要引入事务日志表和时间窗口判断。修复后的回查逻辑OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringtxIdmsg.getHeaders().get(txId).toString();// 本次事务唯一ID// 1. 查事务日志表(用事务ID,不是业务ID)intcountlogMapper.countByTxId(txId);if(count0){// 这条日志是本次事务写入的确认本次事务成功returnRocketMQLocalTransactionState.COMMIT;}// 2. 查不到时,需要时间窗口判断// 可能是事务还在执行中(卡住了),不能直接 RollbacklongbornTimeLong.parseLong(msg.getProperty(BORN_TIMESTAMP));if(System.currentTimeMillis()-bornTime5*60*1000){// 返回 UNKNOWN,告诉 MQ:我不确定,你过一会再来问returnRocketMQLocalTransactionState.UNKNOWN;}// 超过时间窗口,才判定为真正失败returnRocketMQLocalTransactionState.ROLLBACK;}事务日志表的设计CREATETABLEt_transaction_log(tx_idVARCHAR(64)PRIMARYKEY,-- 本次事务唯一IDbiz_idVARCHAR(64),-- 业务ID(订单号等)statusVARCHAR(16),-- SUCCESS / FAILcreated_atTIMESTAMP,INDEXidx_biz_id(biz_id));关键点:tx_id是本次事务的唯一标识,每次事务都不同biz_id是业务标识,可能重复(同一订单多次操作)回查时用tx_id查询,确保查到的是本次事务的记录为什么事务日志表能解决问题查询方式查询条件查到什么代表什么查业务表biz_id业务ID业务数据的最终状态“业务表里有数据”,但可能是旧数据查事务日志表tx_id事务ID本次事务的执行记录“本次事务成功写入日志”,确定性本质区别业务表存储业务数据的最终状态可能被多次事务修改事务日志表存储每次事务的执行记录每次事务一条互不干扰重要补充事务日志表并不能解决所有问题关键点不管是查事务日志表还是查业务表查不到时都面临同样的困境。场景查业务表查事务日志表本次事务成功查到 → COMMIT查到 → COMMIT本次事务失败但业务表有旧数据查到 → COMMIT误判没查到 → 需时间窗口本次事务还在执行中卡住没查到 → 需时间窗口没查到 → 需时间窗口本次事务失败回滚没查到 → 需时间窗口没查到 → 需时间窗口事务日志表解决的问题消除查到时的误判——避免业务表旧数据的干扰确保查到 本次事务成功成为确定性判断事务日志表无法解决的问题“查不到时怎么判断”——仍然需要时间窗口机制结论事务日志表解决查到时可能误判的问题应对重复下单用同一ID等情况时间窗口判断解决查不到时的歧义问题区分执行中 vs 失败两者配合才能彻底解决竞态条件问题。单靠事务日志表或单靠时间窗口都不够。关键结论修复后的复杂度并不比本地消息表低看到上面的方案你会发现一个关键问题为了解决 RocketMQ 事务消息的竞态条件问题需要引入事务日志表记录每次事务的执行状态时间窗口判断逻辑区分执行中和失败回查代码查表 时间窗口判断Service 层改造业务数据和事务日志在同一事务中写入这个复杂度……并不比本地消息表方案低。方案需要的组件复杂度RocketMQ 事务消息裸奔版回查代码查业务表低但不可靠RocketMQ 事务消息修复版事务日志表 时间窗口判断 回查逻辑 Service改造中本地消息表消息表 定时任务 幂等控制高本质上修复版的 RocketMQ 事务消息和本地消息表方案复杂度相当。两者都需要额外的数据库表事务日志表 / 消息表和业务数据在同一事务中写入额外的机制确保最终投递回查 / 定时任务轮询唯一的区别RocketMQ实时投递事务提交后立即 Commit 消息本地消息表轮询投递定时任务扫描后投递所以如果 RocketMQ 事务消息需要这么多额外代码才能可靠工作那选择本地消息表也未尝不可——至少它天生就包含了这些机制。修复后的完整代码ComponentRocketMQTransactionListenerpublicclassOrderTransactionListenerimplementsRocketMQLocalTransactionListener{AutowiredprivateOrderServiceorderService;AutowiredprivateTransactionLogMapperlogMapper;/** * 执行本地事务 * 核心原则:业务数据和事务日志,必须在同一个 Transactional 下提交 */OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){try{// 开启本地事务(包含:写业务表 写日志表)orderService.createOrderWithLog((OrderParam)arg);returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){returnRocketMQLocalTransactionState.ROLLBACK;}}/** * 回查接口 * 解决竞态条件和旧数据干扰问题 */OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringtxIdmsg.getHeaders().get(txId).toString();// 1. 查事务日志表intcountlogMapper.countByTxId(txId);if(count0){returnRocketMQLocalTransactionState.COMMIT;}// 2. 时间窗口判断longbornTimeLong.parseLong(msg.getProperty(BORN_TIMESTAMP));if(System.currentTimeMillis()-bornTime5*60*1000){returnRocketMQLocalTransactionState.UNKNOWN;}returnRocketMQLocalTransactionState.ROLLBACK;}}// Service 层TransactionalpublicvoidcreateOrderWithLog(OrderParamparam){StringtxIdparam.getTxId();// 写业务表orderMapper.insert(order);// 写事务日志表(同一事务)logMapper.insert(newTransactionLog(txId:txId,bizId:order.getId(),status:SUCCESS));// 事务提交:要么两个都成功,要么两个都失败}五、看到这里你会发现一个问题为了解决 RocketMQ 事务消息的竞态问题,我们引入了:事务日志表:记录每次事务的执行状态时间窗口判断:区分执行中和失败这个复杂度…并不比本地消息表低。六、本地消息表经典但可靠的方案核心原理本地消息表的思路更直接:把发消息这个动作本身变成数据库操作,和业务数据放在同一个事务中。Transactionalpublicvoidregister(Useruser){// 1. 写入用户表(业务操作)userMapper.insert(user);// 2. 写入本地消息表(和业务数据在同一事务中)msgMapper.insert(newMsg(exchange:user.welcome,routingKey:user.user.getId(),body:JSON.toJSONString(user),status:INIT));// 事务提交后,两个操作都已持久化}然后有一个后台任务轮询:Scheduled(fixedDelay10000)// 每 10 秒扫描一次publicvoidretryFailedMessages(){ListMsgmsgsmsgMapper.selectByStatus(INIT);for(Msgmsg:msgs){try{mqProducer.send(msg.getExchange(),msg.getRoutingKey(),msg.getBody());msg.setStatus(SUCCESS);}catch(Exceptione){msg.setStatus(FAIL);msg.setFailCount(msg.getFailCount()1);}msgMapper.update(msg);}}为什么能解决问题?场景结果一致性步骤 1 成功步骤 2 失败整个事务回滚用户和消息都没写入一致步骤 1、2 都成功但事务提交前宕机事务回滚数据都没写入一致事务成功MQ 投递失败后台任务不断重试直到成功最终一致项目中的实际实现我分析的项目中,自研框架的核心实现:// DefaultMsgSender.javaif(hasTransaction){// 1. 消息先写入 t_msg 表(与业务数据在同一事务中)msgService.batchInsert(...);// 2. 注册事务同步回调 → 事务提交后才投递TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronization(){OverridepublicvoidafterCompletion(intstatus){if(statusSTATUS_COMMITTED){// 事务提交后异步投递消息mqExecutor.execute(()-deliverMsg(msgPOList));}}});}关键设计:消息和业务数据在同一事务中 → 原子性保证SpringTransactionSynchronization→ 事务提交后才投递补偿 Job 扫表重试 → 兜底机制七、两种方案的对比维度RocketMQ 事务消息(裸奔版)RocketMQ 事务消息(修复版)本地消息表实现原理半消息 → 本地事务 → Commit/Rollback → 回查半消息 事务日志表 时间窗口回查业务数据 消息记录同事务 → 定时任务轮询投递竞态条件无法处理通过 UNKNOWN 状态解决不存在消息先入库实时性高高低(取决于轮询频率)通用性仅 RocketMQ仅 RocketMQ任何 MQ(RabbitMQ、Kafka 等)复杂度低中(需要事务日志表)高(需要定时任务、幂等控制)性能开销低中(多写一条日志)高(定时轮询数据库)可控性低低高(自定义重试策略)可观测性中中高(状态都在数据库)八、本质分析看到对比表,你会发现一个有趣的结论:RocketMQ 事务消息的修复版,本质上就是把事务日志表换成了本地消息表。两者都是:在本地事务中写入一条记录(业务数据 事务日志/消息记录)通过数据库事务保证原子性通过额外的机制确保最终投递(回查 vs 轮询)唯一的区别:RocketMQ:实时投递(事务提交后立即 Commit 消息)本地消息表:轮询投递(定时任务扫描后投递)九、什么时候选择本地消息表1. 已有其他 MQ(RabbitMQ、Kafka 等)RabbitMQ原生不支持事务消息,只能通过本地消息表实现等价功能。RocketMQ 是后期引入的?此时自研框架已经稳定运行,没必要迁移。2. 需要精细控制重试策略项目需要处理多个业务模块的异步事件:点赞、收藏、签到、红包、文章审核…每个场景对重试策略的要求不同。本地消息表支持:衰减重试:1s、5s、30s、1min、5min、10min…自定义幂等键:每个消费者可以自定义手动补偿:直接操作数据库,灵活干预3. 需要高度可观测性所有消息状态都在数据库里:运维人员可以直接查询消息投递状态监控系统可以基于数据库表做告警出问题时可以手动修复或重试十、消费端的原子幂等通用无论用哪种方案,消费端都必须实现原子幂等。错误做法TransactionalpublicvoidonMessage(Stringmessage){OrderMsgmsgJSON.parseObject(message,OrderMsg.class);// 幂等检查和业务执行分离if(idempotentMapper.exists(msg.getOrderId())){return;}inventoryService.addPoints(msg.getUserId(),msg.getPoints());// 失败时业务执行了但幂等记录没插进去idempotentMapper.insert(msg.getOrderId());}正确做法Transactional(rollbackForException.class)publicvoidonMessage(Stringmessage){OrderMsgmsgJSON.parseObject(message,OrderMsg.class);// 利用数据库唯一索引做原子幂等挡板try{idempotentMapper.insertBarrier(msg.getOrderId());}catch(DuplicateKeyExceptione){// 插入报错,说明消费过了,直接返回return;}// 执行业务// 如果报错,事务会回滚,幂等记录也撤销,下次重试能进来inventoryService.addPoints(msg.getUserId(),msg.getPoints());}十一、生产级兜底方案无论选择哪种方案,都必须承认系统的不确定性,建立兜底机制。1. 日志表/消息表爆炸了怎么办?每天几百万条记录,一个月表就废了。错误DELETE FROM log WHERE time ...间隙锁阻塞写入正确MySQL 分区表按天分区清理时直接DROP PARTITION2. 死信队列怎么处理?告警:DLQ 进消息,立刻推送到开发群重投平台:一键原样重投或修改参数后重投最终底线:人工数据库修复3. 消息积压怎么办?监控告警:设置积压阈值临时扩容:增加消费者实例降级策略:非核心业务暂停投递十二、总结RocketMQ 事务消息 vs 本地消息表选择 RocketMQ 事务消息选择本地消息表项目从零开始,没有历史包袱已有 RabbitMQ 等其他 MQ团队熟悉 RocketMQ 生态需要精细控制重试策略追求实时性需要高度可观测性希望减少代码量需要跨 MQ 通用方案技术决策的关键点定性:明确是最终一致性,不是强一致性严谨:RocketMQ 需要事务日志表 时间窗口;本地消息表本身已包含这些机制落地:消费端实现原子幂等兜底:建立死信告警 批量重投机制结论没有最好的架构,只有最适合场景的方案。RocketMQ 事务消息和本地消息表,本质上是两种不同的实现路径,最终效果等价–都是确保业务操作和消息投递要么都成功,要么都不发生。如果 RocketMQ 事务消息需要引入事务日志表才能可靠工作,那复杂度已经和本地消息表相当了。这时候,选择的关键在于:你的项目用什么 MQ?你需要什么样的可控性和可观测性?你的团队有什么技术积累?选择适合自己场景的方案,比追求最新最酷的技术更重要。