分布式消息投递:如何做到不丢不重?

📅 2026/6/27 2:41:35
分布式消息投递:如何做到不丢不重?
分布式消息投递如何做到不丢不重在分布式系统里消息队列是标配但随之而来的“消息丢了”和“消息重复消费”也是让后端开发最头疼的两个坑。消息丢了业务数据就不一致——用户下单了库存没扣消息重复了逻辑就被执行多次——用户只买了一件账户被扣两次钱。这两个问题看似矛盾其实解决思路很明确消息丢失靠确认机制和持久化重复消费靠幂等设计。两者必须同时实现缺一不可。核心机制确认与幂等可靠投递的核心是At-Least-Once至少一次语义。消息至少会被消费一次但可能会被消费多次。想要实现 Exactly-Once精确一次必须在消费端做幂等处理。生产者确认机制的工作流很简单消息发出去后等 Broker 的 ACK超时没收到就重试。这里有两个参数很关键重试次数和重试间隔。生产环境推荐指数退避重试1s, 2s, 4s, 8s最大重试 3-5 次。超过次数后消息写入死信队列Dead Letter Queue由人工或脚本处理。消费者幂等性的实现通常依赖唯一标识 去重表。每条消息带一个全局唯一的 Message ID消费者处理前先查去重表。如果 ID 已存在直接跳过如果不存在执行业务逻辑并写入去重表。注意去重表和业务操作必须在同一个事务中否则会出现“去重表写入成功但业务失败”的不一致状态。sequenceDiagram participant P as 生产者 participant B as Broker participant C as 消费者 participant D as 去重表 P-B: 发送消息 (msg_idabc123) B--P: ACK 确认 Note over P,B: 若超时未收到ACK触发重试 B-C: 投递消息 (msg_idabc123) C-D: 查询 msg_id 是否存在 D--C: 不存在首次消费 C-C: 执行业务逻辑 C-D: 写入去重表 (msg_idabc123) C-C: 提交本地事务 C--B: ACK 确认 Note over B,C: 若消费者宕机Broker重新投递 B-C: 重新投递 (msg_idabc123) C-D: 查询 msg_id 是否存在 D--C: 已存在跳过消费 C--B: ACK 确认实战基于 Redis Stream 的实现Redis 5.0 引入的 Stream 数据结构天然支持消费组和消息确认是实现轻量级可靠消息队列的理想选择。相比 Kafka 和 RabbitMQ它的优势是部署简单、延迟极低亚毫秒级劣势是持久化能力弱于 Kafka不适合对数据丢失零容忍的场景。下面是一个基于 Redis Stream 的可靠消息队列实现包含了生产者确认、消费者幂等和死信队列处理。import asyncio import json import time import uuid import logging from dataclasses import dataclass from typing import Any, Callable, Coroutine import redis.asyncio as aioredis logger logging.getLogger(__name__) dataclass class Message: 消息结构每条消息携带全局唯一ID和业务载荷 msg_id: str payload: dict timestamp: float retry_count: int 0 class ReliableProducer: 可靠生产者发送消息并等待 Broker 确认支持指数退避重试。 为什么不用 Redis 的 XADD 天然幂等 因为 XADD 每次调用都会生成新的 entry ID 即使 payload 完全相同也会产生重复消息。 所以幂等性必须在业务层实现。 def __init__( self, redis_client: aioredis.Redis, stream: str, max_retries: int 3, retry_base_delay: float 1.0, ): self._redis redis_client self._stream stream self._max_retries max_retries self._retry_base_delay retry_base_delay async def send(self, payload: dict) - str: 发送消息返回消息ID。支持重试超时后写入死信队列。 为什么自己生成 msg_id 而不用 Redis 的自动 ID 因为业务层需要用 msg_id 做幂等去重 Redis 自动生成的 ID 是时间戳格式不适合做业务主键。 msg_id uuid.uuid4().hex message Message( msg_idmsg_id, payloadpayload, timestamptime.time(), ) for attempt in range(self._max_retries 1): try: # XADD 写入 Stream* 表示由 Redis 自动分配 entry ID entry_id await self._redis.xadd( self._stream, { msg_id: msg_id, data: json.dumps(payload, ensure_asciiFalse), timestamp: str(message.timestamp), }, ) logger.info( 消息发送成功: msg_id%s, entry_id%s, msg_id, entry_id, ) return msg_id except Exception as e: if attempt self._max_retries: # 重试耗尽写入死信队列 await self._send_to_dead_letter(message, str(e)) raise RuntimeError( f消息发送失败已写入死信队列: {e} ) from e delay self._retry_base_delay * (2 ** attempt) logger.warning( 消息发送失败%0.1fs 后重试 (第%d次): %s, delay, attempt 1, e, ) await asyncio.sleep(delay) async def _send_to_dead_letter( self, message: Message, error: str ) - None: 将发送失败的消息写入死信队列供后续人工处理 dlq_stream f{self._stream}:dead_letter await self._redis.xadd( dlq_stream, { msg_id: message.msg_id, data: json.dumps(message.payload, ensure_asciiFalse), error: error, timestamp: str(message.timestamp), failed_at: str(time.time()), }, ) class IdempotentConsumer: 幂等消费者消费前检查去重表确保消息只被处理一次。 去重表存储在 Redis 的 Hash 结构中 key 为 msg_idvalue 为处理结果摘要。 为什么用 Hash 而不是 Set 因为 Hash 可以存储处理结果方便排查问题时查看历史消费记录。 def __init__( self, redis_client: aioredis.Redis, stream: str, group: str, consumer: str, dedup_key: str msg:dedup, dedup_ttl: int 86400, # 去重记录保留24小时 batch_size: int 10, ): self._redis redis_client self._stream stream self._group group self._consumer consumer self._dedup_key dedup_key self._dedup_ttl dedup_ttl self._batch_size batch_size self._running False async def start(self, handler: Callable[[Message], Coroutine]) - None: 启动消费循环读取消息 → 幂等检查 → 业务处理 → 确认。 handler 是业务处理函数接收 Message 对象。 如果 handler 抛异常消息不会被 ACK 后续会重新投递。 self._running True # 确保消费组存在不存在则创建 try: await self._redis.xgroup_create( self._stream, self._group, id0, mkstreamTrue ) except aioredis.ResponseError as e: if BUSYGROUP not in str(e): raise while self._running: try: # XREADGROUP 阻塞读取消息超时5秒 messages await self._redis.xreadgroup( self._group, self._consumer, {self._stream: }, countself._batch_size, block5000, ) if not messages: continue for stream_name, entries in messages: for entry_id, fields in entries: await self._process_message( entry_id, fields, handler ) except asyncio.CancelledError: self._running False break except Exception as e: logger.error(消费循环异常: %s, e) await asyncio.sleep(1) async def _process_message( self, entry_id: bytes, fields: dict, handler: Callable, ) - None: 处理单条消息幂等检查 → 业务处理 → ACK msg_id fields[bmsg_id].decode() data json.loads(fields[bdata].decode()) # 幂等检查该 msg_id 是否已被处理过 is_duplicate await self._redis.hexists( self._dedup_key, msg_id ) if is_duplicate: logger.info(重复消息跳过: msg_id%s, msg_id) # 即使是重复消息也要 ACK否则会一直重投 await self._redis.xack( self._stream, self._group, entry_id ) return message Message( msg_idmsg_id, payloaddata, timestampfloat(fields[btimestamp].decode()), ) try: # 执行业务处理 result await handler(message) # 业务成功写入去重表 ACK两步操作 # 为什么不用 Lua 脚本保证原子性 # 因为 ACK 失败只会导致消息重新投递幂等检查会兜底 # 不会导致业务重复执行。引入 Lua 脚本会增加复杂度 # 收益不足以抵消维护成本。 await self._redis.hset( self._dedup_key, msg_id, json.dumps({ result: str(result)[:200], # 截断防止大value processed_at: time.time(), }), ) # 设置去重记录的过期时间防止无限增长 await self._redis.expire(self._dedup_key, self._dedup_ttl) await self._redis.xack( self._stream, self._group, entry_id ) logger.info(消息处理成功: msg_id%s, msg_id) except Exception as e: logger.error( 消息处理失败: msg_id%s, error%s, msg_id, e ) # 不 ACK让 Broker 重新投递 # 但需要限制重投次数超过阈值转入死信队列 # 这里通过 XPENDING 命令检查重投次数 pending await self._redis.xpending_range( self._stream, self._group, minentry_id, maxentry_id, count1, ) if pending and pending[0][times_delivered] 3: await self._redis.xack( self._stream, self._group, entry_id ) await self._send_to_dead_letter(message, str(e)) logger.warning( 消息重投超限转入死信队列: msg_id%s, msg_id ) async def _send_to_dead_letter( self, message: Message, error: str ) - None: 将处理失败的消息转入死信队列 dlq_stream f{self._stream}:dead_letter await self._redis.xadd( dlq_stream, { msg_id: message.msg_id, data: json.dumps(message.payload, ensure_asciiFalse), error: error, failed_at: str(time.time()), }, ) async def stop(self) - None: 优雅停止消费循环 self._running False可靠性的代价吞吐量与延迟的权衡可靠投递不是免费的。每条消息的完整生命周期——发送、确认、幂等检查、业务处理、ACK——至少需要 3 次网络往返XADD HEXISTS XACK。如果每条消息还要写入去重表就是 4 次网络往返。在单线程 Redis 上这意味着每条消息至少增加 1-2ms 的延迟。吞吐量的影响更直接。开启生产者确认后发送端必须等待 ACK 才能发下一条消息除非批量发送。单条发送的吞吐量上限约为 Redis 单节点 QPS 的 1/3约 30K msg/s批量发送可以提升到 80K msg/s但批量发送会增加端到端延迟。幂等检查的额外开销取决于去重表的命中率。如果重复率很低 1%去重检查几乎全是缓存命中开销可以忽略。但在 Broker 频繁重投的场景下去重表的读写频率会显著上升此时需要考虑给去重表加本地缓存如 LRU Cache减少 Redis 访问次数。Redis Stream 本身的持久化也是一个权衡点。如果开启 AOF 每条刷盘appendfsync always消息不会丢失但吞吐量下降到约 10K msg/s。如果使用默认的每秒刷盘最多丢失 1 秒的数据吞吐量可以恢复到 80K msg/s。生产环境通常选择每秒刷盘配合业务层的补偿机制来弥补可能的数据丢失。几点经验消息不丢不重的核心是“至少一次投递 消费端幂等”。生产者确认机制保证消息不丢幂等去重表保证消息不重死信队列兜底处理异常消息。Redis Stream 提供了轻量级的实现方案适合中等规模QPS 100K的业务场景。落地时有几点经验可以参考先上幂等再上确认幂等是消费端改造风险可控确认机制涉及生产端影响面更大。去重表 TTL 设为 24 小时覆盖绝大多数重投场景同时避免去重表无限膨胀。死信队列必须有监控死信队列里的消息意味着业务异常每条都需要人工介入。压测确认吞吐上限在目标 QPS 下验证确认机制和幂等检查的延迟是否可接受。定期清理 Stream使用 XTRIM 限制 Stream 长度避免 Redis 内存持续增长。