086、FastAPI 生产实践:数据库会话管理、JWT 认证、Rate Limiting

📅 2026/6/30 2:05:27
086、FastAPI 生产实践:数据库会话管理、JWT 认证、Rate Limiting
086、FastAPI 生产实践数据库会话管理、JWT 认证、Rate Limiting从一次线上事故说起上周三凌晨两点我被报警电话吵醒。生产环境的FastAPI服务突然大面积返回500错误用户登录全部失败。我睡眼惺忪地打开日志发现数据库连接池耗尽所有请求都在等待数据库会话释放。更诡异的是JWT token验证偶尔成功偶尔失败同一个token在不同请求中表现不一致。排查到最后问题出在三个地方数据库会话没有正确关闭、JWT密钥在多个worker间不一致、某个爬虫在疯狂刷接口。这三个问题恰好对应了今天要聊的三个主题——数据库会话管理、JWT认证、Rate Limiting。如果你正在用FastAPI做生产项目这篇文章能帮你少踩我踩过的坑。数据库会话管理别让连接池变成定时炸弹会话工厂的正确姿势很多新手喜欢在每个请求里手动创建数据库连接这是灾难的开始。正确的做法是用sessionmaker创建会话工厂然后通过依赖注入管理生命周期。# 别这样写每次请求都新建引擎# engine create_engine(DATABASE_URL)# Session sessionmaker(bindengine)# 正确做法全局一个引擎按需创建会话fromsqlalchemyimportcreate_enginefromsqlalchemy.ormimportsessionmaker,SessionfromfastapiimportDepends,FastAPI enginecreate_engine(DATABASE_URL,pool_size20,# 这里踩过坑太小了并发一高就死max_overflow10,pool_pre_pingTrue,# 这个参数救过我的命自动检测连接是否有效pool_recycle3600# 连接超过1小时自动回收防止MySQL主动断开)SessionLocalsessionmaker(autocommitFalse,autoflushFalse,bindengine)defget_db():dbSessionLocal()try:yielddbfinally:db.close()# 这里一定要close不然连接池会泄漏注意那个finally块很多教程只写yield db忘了关闭。生产环境跑几天就会发现连接数暴涨直到数据库拒绝连接。异步会话的坑如果你用异步数据库驱动比如asyncpg事情会复杂一些。FastAPI的异步依赖注入和同步的不太一样fromsqlalchemy.ext.asyncioimportcreate_async_engine,AsyncSession,async_sessionmaker async_enginecreate_async_engine(ASYNC_DATABASE_URL,pool_size20,max_overflow10)AsyncSessionLocalasync_sessionmaker(async_engine,expire_on_commitFalse)asyncdefget_async_db():asyncwithAsyncSessionLocal()assession:yieldsession# 这里不用手动closeasync with会自动处理# 但注意如果yield之后有异常session可能不会正确回滚这里有个隐藏问题expire_on_commitFalse。默认情况下commit之后所有对象会过期下次访问会重新查询。如果你在commit之后还想用对象记得设置这个参数为False否则会触发额外的数据库查询性能下降不说还可能因为session已关闭而报错。事务管理别让脏数据污染你的数据库我见过最离谱的代码是在视图函数里手动调用db.commit()和db.rollback()。正确的做法是用依赖注入统一管理事务fromcontextlibimportcontextmanagercontextmanagerdeftransaction(db:Session):try:yielddb.commit()exceptException:db.rollback()raise# 在路由中使用app.post(/users/)defcreate_user(user:UserCreate,db:SessionDepends(get_db)):withtransaction(db):db_userUser(**user.dict())db.add(db_user)# 这里如果抛出异常事务会自动回滚returndb_user这样写的好处是所有数据库操作都在一个事务上下文中要么全部成功要么全部失败。别在多个函数里分散commit否则一个函数成功另一个失败数据就不一致了。JWT认证从入门到生产级基础实现别把密钥写死在代码里JWT认证的核心是签名和验证。很多教程把SECRET_KEY直接写在代码里这是生产环境的大忌。fromdatetimeimportdatetime,timedeltafromjoseimportJWTError,jwtfrompasslib.contextimportCryptContext# 别这样写SECRET_KEY my-secret-key# 正确做法从环境变量读取importos SECRET_KEYos.getenv(JWT_SECRET_KEY,fallback-dev-key)# 生产环境必须设置ALGORITHMHS256ACCESS_TOKEN_EXPIRE_MINUTES30pwd_contextCryptContext(schemes[bcrypt],deprecatedauto)defcreate_access_token(data:dict,expires_delta:timedeltaNone):to_encodedata.copy()ifexpires_delta:expiredatetime.utcnow()expires_deltaelse:expiredatetime.utcnow()timedelta(minutes15)to_encode.update({exp:expire})encoded_jwtjwt.encode(to_encode,SECRET_KEY,algorithmALGORITHM)returnencoded_jwt这里有个细节exp字段是Unix时间戳别传datetime对象jose库会自动处理。但如果你用其他库可能需要手动转换。依赖注入让每个路由都能验证用户fromfastapiimportDepends,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer oauth2_schemeOAuth2PasswordBearer(tokenUrltoken)asyncdefget_current_user(token:strDepends(oauth2_scheme),db:SessionDepends(get_db)):credentials_exceptionHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detail无法验证凭据,headers{WWW-Authenticate:Bearer},)try:payloadjwt.decode(token,SECRET_KEY,algorithms[ALGORITHM])user_id:strpayload.get(sub)ifuser_idisNone:raisecredentials_exceptionexceptJWTError:raisecredentials_exception userdb.query(User).filter(User.iduser_id).first()ifuserisNone:raisecredentials_exceptionreturnuser注意这里每次请求都会查数据库。如果用户量很大可以考虑用Redis缓存用户信息但要注意缓存失效问题。我踩过的坑是用户被禁用后缓存里的token还能用直到过期。解决方案是在token里加入用户状态版本号。生产级优化Token黑名单和刷新机制JWT一旦签发就无法撤销这是它最大的痛点。生产环境需要实现token黑名单# 用Redis存储黑名单importredis rredis.Redis(hostlocalhost,port6379,db0)defrevoke_token(token:str,expire_in:int3600):# 将token加入黑名单过期时间设为token的剩余有效期r.setex(fblacklist:{token},expire_in,revoked)defis_token_revoked(token:str)-bool:returnr.exists(fblacklist:{token})# 在验证时检查asyncdefget_current_user(token:strDepends(oauth2_scheme)):ifis_token_revoked(token):raiseHTTPException(status_code401,detailToken已失效)# 继续验证...刷新token的机制也很重要。别让用户频繁登录但也不能让token永不过期。我一般用双token方案access token有效期15分钟refresh token有效期7天。Rate Limiting别让爬虫打垮你的服务基础实现内存中的简单限流FastAPI没有内置的限流中间件但实现起来不难。最简单的方案是用内存字典fromcollectionsimportdefaultdictimporttimeclassMemoryRateLimiter:def__init__(self):self.requestsdefaultdict(list)defis_allowed(self,key:str,max_requests:int,window_seconds:int)-bool:nowtime.time()window_startnow-window_seconds# 清理过期记录self.requests[key][tfortinself.requests[key]iftwindow_start]iflen(self.requests[key])max_requests:returnFalseself.requests[key].append(now)returnTruelimiterMemoryRateLimiter()app.get(/api/data)defget_data(user:UserDepends(get_current_user)):ifnotlimiter.is_allowed(fuser:{user.id},max_requests10,window_seconds60):raiseHTTPException(status_code429,detail请求过于频繁)return{data:some data}这个方案的问题很明显重启服务后限流数据丢失多worker环境下不共享。生产环境必须用Redis。生产级方案基于Redis的滑动窗口限流importredisimporttimeclassRedisRateLimiter:def__init__(self,redis_client:redis.Redis):self.redisredis_clientdefis_allowed(self,key:str,max_requests:int,window_seconds:int)-bool:nowint(time.time())window_startnow-window_seconds# 使用有序集合存储时间戳pipelineself.redis.pipeline()pipeline.zadd(key,{now:now})pipeline.zremrangebyscore(key,0,window_start)pipeline.zcard(key)pipeline.expire(key,window_seconds1)resultspipeline.execute()current_countresults[2]# zcard的结果returncurrent_countmax_requests这里用Redis的有序集合实现滑动窗口比固定窗口更精确。固定窗口的问题在于如果用户在窗口边界集中请求可能瞬间打垮服务。滑动窗口能平滑流量。按用户和接口分别限流不同接口的限流策略应该不同。登录接口要严格限制防止暴力破解普通查询接口可以宽松一些defrate_limit(key_prefix:str,max_requests:int,window_seconds:int):defdecorator(func):asyncdefwrapper(*args,**kwargs):# 从依赖注入中获取当前用户requestkwargs.get(request)userkwargs.get(current_user)ifuser:keyf{key_prefix}:user:{user.id}else:# 未登录用户用IP限流client_iprequest.client.host keyf{key_prefix}:ip:{client_ip}ifnotredis_limiter.is_allowed(key,max_requests,window_seconds):raiseHTTPException(status_code429,detail请求过于频繁)returnawaitfunc(*args,**kwargs)returnwrapperreturndecoratorapp.post(/login)rate_limit(login,max_requests5,window_seconds60)asyncdeflogin(request:Request):# 登录逻辑pass注意登录接口的限流要基于IP因为用户还没登录。但IP可能被代理隐藏所以最好结合User-Agent等其他信息。个人经验性建议数据库会话管理永远不要在视图函数里手动管理会话。用依赖注入统一管理配合contextmanager处理事务。如果遇到连接池耗尽先检查是不是有会话没关闭再考虑调整连接池大小。JWT认证密钥一定要从环境变量读取别写死在代码里。生产环境用RS256代替HS256这样密钥对可以分开管理。token里只放必要信息用户ID、角色别把密码等敏感信息放进去。Rate Limiting别用内存限流除非你的服务只有一个worker。Redis是标准方案但要注意Redis本身也可能成为瓶颈。如果流量特别大考虑用Nginx层限流作为第一道防线。调试技巧遇到认证问题先检查token是否过期再检查密钥是否一致。多worker环境下确保所有worker使用相同的密钥和Redis实例。数据库会话问题先看连接池状态再看是否有未关闭的会话。监控告警这三个组件都要加监控。数据库连接池使用率超过80%要报警JWT验证失败率突然升高要报警某个接口的请求量异常增长要报警。没有监控的生产环境就像闭着眼睛开车。最后说一句这些实践不是银弹。每个项目都有自己的特殊性但理解了原理遇到问题就能快速定位。我踩过的坑希望你能绕过去。