worker服务的可靠性增强

📅 2026/7/1 2:51:00
worker服务的可靠性增强
可靠性分析从架构图上我们可以看出worker调用大模型服务过程中会发生阻塞等待如果此时worker异常容器挂掉了那么此次任务状态会一直为processing并且因为redis关联task_id的消息已经被消费了那么这个任务就无法被识别出来重试。基于这个场景分析我们要补充巡检服务去定时重启处于超时并且状态为processing的任务此时服务可以从mysql捞任务表但考虑到性能等影响我们选择在redis构建新的processing队列存储正在执行的task_id构建processing_ts队列存储开始处理时间巡检服务访问redis的processing队列、processing_ts队列来更新状态异常的任务。适配worker服务逻辑设置原子操作保证worker取任务放入processing不会被中断。二、逻辑实现1. doc_llm_test_worker补充原子操作将task从ready移动到processing记录开始执行的时间TASK_QUEUE_READY_KEY docllm:queue:ready TASK_QUEUE_PROCESSING_KEY docllm:queue:processing TASK_PROCESSING_TS_KEY docllm:hash:processing_ts def worker_loop(): 文档检查任务 worker 主循环 logging.info(doc_llm_test_worker started, waiting for tasks...) while True: try: raw_item redis_client.brpoplpush(TASK_QUEUE_READY_KEY, TASK_QUEUE_PROCESSING_KEY, timeout10) if not raw_item: time.sleep(5) continue # 没有任务就继续下一轮 try: payload_str raw_item.decode(utf-8) data json.loads(payload_str) task_id int(data[task_id]) except Exception as e: logging.exception(finvalid processing queue item: {raw_item!r}) redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw_item) continue start_ts int(time.time()) redis_client.hset(TASK_PROCESSING_TS_KEY, task_id, start_ts) try: process_task(task_id) finally: redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw_item) redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id) except Exception: logging.exception(unexpected error in worker loop, sleep 3s) time.sleep(3)2.补充巡检服务定时重启处于超时并且状态为processing的任务需要做到重新入队 状态恢复流程设置参数 PROCESSING_TIMEOUT_SECONDS 600判断逻辑now_ts - start_ts PROCESSING_TIMEOUT_SECONDS该任务视为worker 处理失败worker 崩了/卡死需要重新 pending丢回 ready 队列给新的 worker适配task_service提供给巡检服务同步改数据库任务状态def mark_task_processing(task_id: int) - bool: worker 刚拿到任务时调用pending - processing with get_session() as session: stmt ( update(TaskDocLLM).where( TaskDocLLM.task_id task_id, TaskDocLLM.status TaskStatus.pending ).values( statusTaskStatus.processing, processing_started_atfunc.now() ) ) result session.execute(stmt) session.commit() return result.rowcount 1 def reclaim_task(task_id: int, timeout_dt) - bool: 将超时的任务重新放回队列 :param timeout_dt: datetime对象代表“必须早于此时间才会被恢复” with get_session() as session: stmt ( update(TaskDocLLM).where( TaskDocLLM.task_id task_id, TaskDocLLM.status TaskStatus.processing, TaskDocLLM.processing_started_at timeout_dt ).values( statusTaskStatus.pending, retry_countTaskDocLLM.retry_count 1, processing_started_atNone, resultNone ) ) result session.execute(stmt) session.commit() return result.rowcount 1新增巡检函数reaper_loop筛选超时任务恢复状态其中要做到扫描处理中的任务在 processing 队列查看 start_ts 是否超时原子重置数据库的任务状态 然后再重入 ready 队列且reaper也需要保证假设有多个worker的情况下其他worker的巡检进程不会同时抢占同一个状态为processing的任务否则可能会导致重复入队。所以用到了reclaim_task的原子 UPDATE。def reaper_loop(): 巡检 processing 队列恢复超时的任务 logging.info(doc_llm_reaper started, interval%ss, timeout%ss, REAPER_INTERVAL_SECONDS, PROCESSING_TIMEOUT_SECONDS) while True: try: now_ts int(time.time()) timeout_border_ts now_ts - PROCESSING_TIMEOUT_SECONDS timeout_threshold_dt datetime.utcnow() - timedelta(secondsPROCESSING_TIMEOUT_SECONDS) items redis_client.lrange(TASK_QUEUE_PROCESSING_KEY, 0, -1) if not items: time.sleep(REAPER_INTERVAL_SECONDS) continue for raw in items: try: payload_str raw.decode(utf-8) payload json.loads(payload_str) task_id payload.get(task_id) task_name payload.get(task_name) except Exception: redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw) continue start_ts_raw redis_client.hget(TASK_PROCESSING_TS_KEY, task_id) if start_ts_raw is None: continue start_ts int(start_ts_raw) if start_ts timeout_border_ts: continue logging.warning(fdoc_llm_reaper: task {task_id} seems stuck, start_ts{start_ts}, now_ts{now_ts}) ok task_service.reclaim_task(task_id, timeout_threshold_dt) if not ok: continue redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw) redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id) new_payload json.dumps( {task_id: task_id, task_name: task_name}, ensure_asciiFalse ) redis_client.lpush(TASK_QUEUE_READY_KEY, new_payload) logging.info(fdoc_llm_reaper: task {task_id} reclaimed and requeued to READY) except Exception: logging.exception(unexpected error in reaper loop, sleep 3s) time.sleep(REAPER_INTERVAL_SECONDS)在主进程之外起一个线程循环跑巡检def start_reaper_thread(): reaper_thread threading.Thread(targetreaper_loop, namedoc_llm_reaper, daemonTrue) reaper_thread.start() return reaper_thread if __name__ __main__: setup_logging() init_llm() start_reaper_thread() worker_loop()三、测试验证我们构造场景让worker在处理一条任务的时候主动挂掉此时环境有一条mysql状态为processing的task并且redis的doc_llm:task_queue:ready没有这条task_id关联的消息只有doc_llm:task_queue:processingdoc_llm:hash:processing_ts队列有值。这相当于这次任务被消费了但worker异常导致任务丢失如果没有巡检我们就只能主动删除这条不会被触发的任务。但我们期望重启后的巡检进程reaper_loop能够检测到异常并让这条任务重新入队设置的超时时间是10分钟。