别再裸奔了:先搞懂事务这层保护膜

📅 2026/6/25 18:30:10
别再裸奔了:先搞懂事务这层保护膜
咱们写业务代码最怕的就是“半拉子工程”。扣了钱库存没减或者减了库存钱没扣着。这时候必须得把ACID刻在烟上吸进肺里尤其是原子性。简单打个比方转账就像你左手倒右手你不能接受左手已经空了右手还没接到吧事务就是那个begin、commit和rollback的保镖。用大白话翻译一下-begin搬个小板凳坐下我要开始搞事情了闲人勿扰。-commit好了活儿干完了完美收工落子无悔。-rollback我去写了个Bug刚才那几步全当没发生桌子擦干净重来。在FastAPI里用SQLAlchemy异步操作千万别再用老一套的 session.commit() 了容易忘关。官方文档虽然那么写但根据我以往的经验一定要用 async with session.begin()这才是亲妈生的写法。async def create_order(db: AsyncSession, order_data): # 只要缩进在这个代码块里sqlalchemy自动帮你开了事务 async with db.begin(): new_order Order(**order_data) db.add(new_order) # 这里可能还要调别的扣库存方法放心一荣俱荣一损俱损 await flush_and_lock_stock(db, order_data.items) # 出了这个缩进还没抛异常自动 commit # 中间哪行代码崩了自动 rollback省心到想哭。 return new_order这里千万别学我当初偷懒在循环里写 commit 不仅慢得像蜗牛一旦中间挂了数据就是“脏”的找都找不回来。 嵌套事务与保存点给自己留个后悔药你可能会问我就想让大事务里的小步骤失败了不影响大局行不行这就得请出savepoint了。这就好比打游戏存档你明知道前面Boss难打先在门口存个档万一死了不用从第一关重来直接读档再战。啥时候用比如下单成功了你要发个积分。积分系统挂了比如网络抖动你总不能因为送积分失败就把整个订单回滚吧那老板得把你头拧下来。这时候在扣完库存后设个保存点发积分挂了就滚回到保存点订单照样生成积分人工补发。async with db.begin() as conn: # 扣库存这步绝不能挂 await conn.execute(update(Stock).where(...).values(countStock.count-1)) # 埋个复活点 savepoint await conn.begin_nested() try: # 这步挂了无所谓 await add_points(conn, user_id, 100) await savepoint.commit() except Exception: await savepoint.rollback() logger.warning(积分加不上先欠着吧)️ 分层架构下的事务协调别把裤子穿反了稍微正规点的项目都得分 Service 层和 Repository 层对吧这时候事务放哪听句劝事务边界一定要放在 Service 层业务逻辑层。Repository 层就老老实实做单表增删改查别自作主张 commit 。看到不少刚入行的小伙伴在 Repo 层每个方法最后都加个 db.commit() 最后业务逻辑得拼好几个 Repo 方法结果第一个 Repo 提交了第二个崩了神仙难救。最好的做法是把 db session 对象像接力棒一样从 Service 传到 Repo在 Service 的入口统一开启 async with db.begin(): 。 分布式事务 Saga 模式当微服务把数据库拆得稀碎上面聊的都是单库操作。现实很骨感订单库在 MySQL用户余额在 Redis 或者别的什么独立服务里。扣钱成功但下单失败怎么办这时候单机事务已经罩不住了。这就是 Saga 模式的场子了。别被名字唬住其实就是一个“正向大管家 反向补偿”的策略。拿电商订单举个栗子1️⃣ 调用库存服务预扣库存成功2️⃣ 调用支付服务扣余额失败余额不足3️⃣关键来了Saga协调器发现支付失败立马扭头去执行“反向补偿”——调用库存服务的“回滚接口”把那件预扣的衣服加回去。在FastAPI里实现Saga可以用状态机模式或者简单点用后台任务队列记录每一步的执行与回滚。最后啰嗦一句补偿接口一定要保证幂等性别因为网络重试给人家库存加了两次那就又成羊毛党了。⚔️ 实战库存扣减防超卖的终极锁方案说完理论来点立马能用的。单纯在代码里 if stock 0 然后 update 是不行的并发一来准挂。咱们得让数据库自己锁住那行数据。# 悲观锁SELECT ... FOR UPDATE # 翻译我要改这行了你们都起开等我改完你们再看。 async def deduct_stock_safe(db: AsyncSession, product_id: int, quantity: int): async with db.begin(): # 用 for_update() 锁住查询结果 stmt select(Product).where(Product.id product_id).with_for_update() result await db.execute(stmt) product result.scalar_one() if product.stock quantity: product.stock - quantity # 不需要手动 add被 session 追踪着呢 else: raise ValueError(库存不足别抢了)是不是以为这样就完了再说个容易翻车的点MySQL默认隔离级别下就算加了行锁如果查询条件没走索引它会锁表直接把整个表给锁了那你的并发量瞬间归零。所以 product_id 必须是索引或者主键。