085、FastAPI 依赖注入体系:Depends、中间件、后台任务的生命周期管理

📅 2026/6/30 2:05:27
085、FastAPI 依赖注入体系:Depends、中间件、后台任务的生命周期管理
085、FastAPI 依赖注入体系Depends、中间件、后台任务的生命周期管理上周五晚上十一点我被线上告警电话从被窝里拽起来。用户反馈某个上传接口偶尔返回500但重启服务后又好了。我翻出日志发现错误集中在后台任务里——一个发送邮件的任务报错说数据库连接已关闭。更诡异的是这个任务明明在依赖里注入了数据库会话按道理生命周期应该由FastAPI管理才对。排查到凌晨三点我终于意识到问题出在哪依赖注入的生命周期不是你想的那样简单。今天就把这个坑掰开揉碎讲清楚。Depends你以为的依赖注入其实是个工厂先看一个我早期写的代码当时觉得挺优雅fromfastapiimportDepends,FastAPIfromsqlalchemy.ormimportSession appFastAPI()defget_db():dbSessionLocal()try:yielddbfinally:db.close()app.post(/users)defcreate_user(db:SessionDepends(get_db)):# 这里踩过坑以为每次请求都新建一个sessionreturndb.query(User).all()这段代码看起来没问题但如果你在同一个请求里多次调用Depends(get_db)它们返回的是同一个对象。FastAPI的依赖注入默认是单例模式——同一个请求作用域内同一个依赖函数只执行一次。别这样写以为每次Depends都会重新执行函数。实际上FastAPI会缓存结果直到请求结束。这特性在大多数场景下是好事但如果你依赖里有随机数生成、时间戳获取就会出问题。更隐蔽的坑在异步依赖里asyncdefget_redis():redisawaitaioredis.from_url(redis://localhost)try:yieldredisfinally:awaitredis.close()app.get(/cache)asyncdefget_cache(redis:RedisDepends(get_redis)):# 这里踩过坑异步生成器必须用async def声明returnawaitredis.get(key)如果你把async def写成defFastAPI不会报错但yield后面的清理代码永远不会执行。我因为这个bug排查了整整两天最后发现是协程没被正确await。中间件生命周期比你想象的长中间件的生命周期是整个应用级别的不是请求级别。这意味着你在中间件里创建的资源如果不手动清理会一直存活到服务重启。看这个反面教材app.middleware(http)asyncdefadd_process_time_header(request:Request,call_next):# 别这样写每次请求都创建新连接但从不关闭dbSessionLocal()request.state.dbdb responseawaitcall_next(request)returnresponse这段代码的问题在于SessionLocal()创建的连接永远不会被关闭。每次请求都泄漏一个连接数据库连接池很快就会被耗尽。正确的做法是用try/finally或者依赖注入系统app.middleware(http)asyncdefdb_session_middleware(request:Request,call_next):# 这里踩过坑必须保证finally执行dbSessionLocal()try:request.state.dbdb responseawaitcall_next(request)returnresponsefinally:db.close()但即使这样写还有一个隐藏问题如果call_next抛出异常response变量还没赋值finally块依然会执行db.close()。这没问题但如果你在finally里引用了response就会报UnboundLocalError。后台任务生命周期管理的重灾区回到开头的那个线上事故。我的代码大概是这样的fromfastapiimportBackgroundTasksdefsend_email(db:Session,user_id:int):userdb.query(User).get(user_id)# 发送邮件逻辑...db.close()# 这里踩过坑以为手动关闭就安全了app.post(/users/{user_id}/notify)defnotify_user(user_id:int,background_tasks:BackgroundTasks,db:SessionDepends(get_db)):# 别这样写后台任务执行时db可能已经关闭了background_tasks.add_task(send_email,db,user_id)return{message:notification queued}问题出在哪Depends(get_db)创建的数据库会话生命周期绑定在请求上。当notify_user函数返回后FastAPI会关闭这个会话。但后台任务send_email还在队列里等着执行——等它真正运行时db已经是一个关闭状态的对象了。解决方案有两种方案一在后台任务内部创建新会话defsend_email(user_id:int):# 这里踩过坑必须自己管理生命周期dbSessionLocal()try:userdb.query(User).get(user_id)# 发送邮件...finally:db.close()app.post(/users/{user_id}/notify)defnotify_user(user_id:int,background_tasks:BackgroundTasks):background_tasks.add_task(send_email,user_id)return{message:notification queued}方案二使用异步后台任务并传递连接池fromsqlalchemy.ext.asyncioimportAsyncSessionasyncdefsend_email_async(user_id:int,async_session_factory):asyncwithasync_session_factory()assession:userawaitsession.get(User,user_id)# 发送邮件...app.post(/users/{user_id}/notify)asyncdefnotify_user(user_id:int,background_tasks:BackgroundTasks):background_tasks.add_task(send_email_async,user_id,async_session_factory)return{message:notification queued}我最终选择了方案一因为方案二需要整个调用链都改成异步改动太大。但方案一也有代价每个后台任务都要手动管理数据库连接容易忘记关闭。生命周期管理的黄金法则经过无数次踩坑我总结出三条铁律1. 依赖注入的资源生命周期等于请求生命周期不要在依赖里创建需要长期持有的资源比如Redis连接池也不要把请求级别的资源传给后台任务。如果非要传请传递工厂函数而不是实例。2. 中间件里的资源必须保证清理中间件是请求的必经之路也是资源泄漏的高发区。每个try都要配一个finally每个open都要配一个close。别相信Python的垃圾回收它不会帮你关闭数据库连接。3. 后台任务的生命周期完全独立于请求后台任务启动时请求已经结束了。这意味着不能使用请求级别的依赖不能访问request.state不能依赖请求上下文中的任何变量如果你需要在后台任务中使用数据库就在任务内部创建新的会话。如果你需要在后台任务中使用配置就把配置对象作为参数传进去而不是从全局变量读取。个人经验写FastAPI三年多我最大的感悟是依赖注入不是魔法它只是帮你管理了对象的创建和销毁时机。当你觉得某个bug很诡异时十有八九是生命周期没对齐。我的调试三板斧在依赖函数里加print(f创建资源: {id(resource)})和print(f销毁资源: {id(resource)})用asyncio.create_task包装后台任务观察执行顺序实在不行就上contextvars手动管理上下文最后说一句别在生产环境用BackgroundTasks做重要任务。如果邮件发送失败你连重试的机会都没有。真正的后台任务应该用Celery或者ARQ它们有完善的重试机制和任务持久化。FastAPI的BackgroundTasks只适合做日志记录、缓存预热这种丢了也不心疼的事。