两阶段提交与补偿事务:分布式事务的两种路径与工程取舍

📅 2026/6/16 0:29:05
两阶段提交与补偿事务:分布式事务的两种路径与工程取舍
两阶段提交与补偿事务分布式事务的两种路径与工程取舍一、分布式事务的现实困境一致性不是免费的单体应用中事务由数据库的 ACID 机制保证。微服务架构下一个业务操作可能跨多个服务——如订单创建需要同时扣减库存、冻结支付额度、创建物流单。这些操作分布在不同服务中各自有独立的数据库无法使用本地事务。分布式事务的核心问题是如何保证跨服务操作的原子性——要么全部成功要么全部回滚。两阶段提交2PC是经典的分布式事务协议但它在性能和可用性上有严重缺陷——协调者单点故障、阻塞等待、吞吐量低。补偿事务Saga是另一种方案它放弃强一致性通过正向操作和补偿操作的配对实现最终一致性。二、2PC 与 Saga 的机制对比强一致 vs 最终一致2PC 分为准备阶段和提交阶段。协调者向所有参与者发送 Prepare 请求参与者执行操作但不提交等待协调者的最终决定。如果所有参与者都准备好了协调者发送 Commit否则发送 Rollback。Saga 将长事务拆分为多个本地事务每个本地事务有对应的补偿事务。如果某个步骤失败按反向顺序执行已完成步骤的补偿事务。flowchart TB subgraph 两阶段提交 A1[协调者: Prepare] -- B1[参与者1: 准备就绪] A1 -- C1[参与者2: 准备就绪] A1 -- D1[参与者3: 准备就绪] B1 -- E1[协调者: 全部就绪] C1 -- E1 D1 -- E1 E1 -- F1[协调者: Commit] F1 -- G1[参与者1: 提交] F1 -- H1[参与者2: 提交] F1 -- I1[参与者3: 提交] end subgraph Saga补偿事务 A2[步骤1: 创建订单] -- B2[步骤2: 扣减库存] B2 -- C2[步骤3: 冻结支付] C2 -- D2{步骤4: 创建物流} D2 --|成功| E2[事务完成] D2 --|失败| F2[补偿3: 解冻支付] F2 -- G2[补偿2: 恢复库存] G2 -- H2[补偿1: 取消订单] end2PC 的致命问题是阻塞——参与者在 Prepare 后必须等待协调者的决定期间持有锁资源。如果协调者崩溃参与者将无限期阻塞。Saga 没有阻塞问题但补偿事务的语义可能不完美——解冻支付不等于从未冻结。三、生产级代码实现2PC 与 Saga3.1 两阶段提交实现import uuid from enum import Enum from typing import List class TransactionState(Enum): PREPARING preparing PREPARED prepared COMMITTING committing COMMITTED committed ROLLING_BACK rolling_back ROLLED_BACK rolled_back class TwoPhaseCoordinator: 2PC 协调者 def __init__(self, participants: List[str]): self.participants participants self.tx_state {} def execute(self, operations: List[dict]) - dict: 执行两阶段提交 tx_id str(uuid.uuid4()) self.tx_state[tx_id] TransactionState.PREPARING # Phase 1: Prepare prepared [] for i, op in enumerate(operations): participant self.participants[i] try: result self._send_prepare(participant, tx_id, op) if result[prepared]: prepared.append(participant) else: # 有参与者无法准备回滚 self._rollback(tx_id, prepared) return {status: rolled_back, reason: result.get(reason)} except Exception as e: # 参与者不可达回滚已准备的参与者 # 为什么不等待超时2PC 中参与者 # Prepare 后持有锁等待超时会导致 # 锁长时间不释放影响其他事务 self._rollback(tx_id, prepared) return {status: rolled_back, reason: f参与者 {participant} 不可达: {e}} # Phase 2: Commit self.tx_state[tx_id] TransactionState.COMMITTING for participant in prepared: try: self._send_commit(participant, tx_id) except Exception as e: # Commit 失败需要重试直到成功 # 为什么必须重试Prepare 阶段已经确认 # 所有参与者可以提交Commit 阶段失败 # 是临时故障重试最终会成功 # 如果放弃会导致数据不一致 self._schedule_retry_commit(participant, tx_id) self.tx_state[tx_id] TransactionState.COMMITTED return {status: committed, tx_id: tx_id} def _rollback(self, tx_id: str, prepared: List[str]): 回滚已准备的参与者 self.tx_state[tx_id] TransactionState.ROLLING_BACK for participant in prepared: try: self._send_rollback(participant, tx_id) except Exception: # 回滚失败也需要重试 self._schedule_retry_rollback(participant, tx_id) self.tx_state[tx_id] TransactionState.ROLLED_BACK3.2 Saga 补偿事务实现from dataclasses import dataclass from typing import Callable, List, Optional dataclass class SagaStep: Saga 步骤定义 name: str action: Callable # 正向操作 compensate: Callable # 补偿操作 class SagaOrchestrator: Saga 编排器 def __init__(self, steps: List[SagaStep]): self.steps steps def execute(self, context: dict) - dict: 执行 Saga 事务 completed_steps [] try: for step in self.steps: # 执行正向操作 # 为什么每步都捕获异常Saga 的核心是 # 任何一步失败都触发补偿不能让异常 # 直接传播导致补偿逻辑无法执行 try: result step.action(context) context.update(result) completed_steps.append(step) except Exception as e: # 正向操作失败触发补偿 self._compensate(completed_steps, context) return { status: compensated, failed_step: step.name, reason: str(e), } return {status: completed, context: context} except Exception as e: # 补偿过程中出现异常 self._compensate(completed_steps, context) return { status: compensated_with_errors, reason: str(e), } def _compensate(self, completed: List[SagaStep], context: dict): 按反向顺序执行补偿 # 为什么反向顺序正向操作有依赖关系 # 后执行的步骤依赖先执行的步骤的结果 # 补偿必须先撤销后执行的步骤 for step in reversed(completed): try: step.compensate(context) except Exception as e: # 补偿失败记录日志人工介入 # 为什么不抛出异常补偿失败不应阻止 # 其他步骤的补偿记录后继续执行 # 剩余补偿最大限度恢复一致性 self._log_compensation_failure(step.name, e) def _log_compensation_failure(self, step_name: str, error: Exception): 记录补偿失败等待人工处理 print(f补偿失败: step{step_name}, error{error}) # 写入补偿失败表运维人员后续处理3.3 订单创建 Saga 示例def create_order_saga(): 订单创建的 Saga 定义 def create_order(ctx): order_id str(uuid.uuid4()) # 调用订单服务创建订单 order_service.create(order_id, ctx[items]) return {order_id: order_id} def cancel_order(ctx): # 补偿取消订单 order_service.cancel(ctx[order_id]) def deduct_inventory(ctx): # 调用库存服务扣减 result inventory_service.deduct( ctx[items], ctx[order_id]) if not result[success]: raise Exception(f库存不足: {result[message]}) return {deduction_id: result[deduction_id]} def restore_inventory(ctx): # 补偿恢复库存 inventory_service.restore(ctx[deduction_id]) def freeze_payment(ctx): # 调用支付服务冻结 result payment_service.freeze( ctx[order_id], ctx[amount]) if not result[success]: raise Exception(f支付冻结失败: {result[message]}) return {freeze_id: result[freeze_id]} def unfreeze_payment(ctx): # 补偿解冻支付 payment_service.unfreeze(ctx[freeze_id]) saga SagaOrchestrator([ SagaStep(create_order, create_order, cancel_order), SagaStep(deduct_inventory, deduct_inventory, restore_inventory), SagaStep(freeze_payment, freeze_payment, unfreeze_payment), ]) return saga四、分布式事务的架构权衡一致性、可用性与性能2PC 的适用场景2PC 适合对一致性要求极高的场景如金融转账且参与者数量少通常 2-3 个、事务时间短秒级。参与者越多、事务越长2PC 的阻塞问题越严重。Saga 的补偿语义局限补偿事务不是撤销而是反向操作。扣减库存的补偿是恢复库存但如果库存已经被其他订单占用恢复可能导致超卖。解决方案是在补偿操作中增加幂等性检查和业务规则校验。混合方案核心链路用 2PC 保证强一致性非核心操作用 Saga 保证最终一致性。如订单创建中支付冻结用 2PC金额不能错物流创建用 Saga晚一点创建可接受。事务日志的持久化无论是 2PC 还是 Saga事务状态必须持久化到稳定存储。协调者崩溃后恢复逻辑需要读取事务日志决定下一步操作。事务日志是分布式事务的黑匣子必须保证不丢失。五、总结分布式事务的选择应基于业务对一致性的真实需求。2PC 提供强一致性但有阻塞风险适合短事务、少参与者的场景。Saga 提供最终一致性且无阻塞适合长事务、多参与者的场景。落地时建议优先尝试 Saga只有在对一致性有硬性要求时才使用 2PC。补偿操作的幂等性是 Saga 的硬性要求必须在设计阶段就考虑。事务日志的持久化是两种方案的共同基础。