为什么不用SessionCookie了传统Session方案的痛点问题表现扩展性差Session存在单台服务器内存多实例无法共享CSRF风险Cookie自动携带容易被恶意网站利用跨域麻烦Cookie在跨域场景下各种限制移动端不友好App/小程序很难处理CookieJWT OAuth2是无状态、跨平台、支持SSO的现代方案。OAuth2四种授权模式MonkeyCode帮你选最合适的模式适用场景典型用户授权码模式有后端的Web应用最安全你的SaaS平台隐式模式纯前端SPA已淘汰不推荐—密码模式高度信任的第一方App你自己的官方App客户端凭证模式服务间调用微服务A调用微服务B99%的Web应用应该用授权码模式 PKCE防止授权码被截获。实战用Authlib实现OAuth2 JWT安装依赖让MonkeyCode生成requirements.txtauthlib1.3.0 fastapi0.104.0 pydantic2.5.0 python-jose[cryptography]3.3.0 passlib[bcrypt]1.7.4完整实现授权码模式 JWT# app/auth.py - 完整认证模块 from fastapi import FastAPI, Depends, HTTPException, Request, status from authlib.integrations.starlette import OAuth from starlette.middleware.sessions import SessionMiddleware from jose import JWTError, jwt from passlib.context import CryptContext from datetime import datetime, timedelta from typing import Optional, List import os app FastAPI(titleOAuth2 JWT Demo) app.add_middleware(SessionMiddleware, secret_keyos.getenv(SESSION_SECRET, dev-secret)) # ─── 密码哈希 ─── pwd_context CryptContext(schemes[bcrypt], deprecatedauto) def hash_password(password: str) - str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) - bool: return pwd_context.verify(plain, hashed) # ─── JWT工具 ─── SECRET_KEY os.getenv(JWT_SECRET, dev-jwt-secret-change-in-prod) ALGORITHM HS256 ACCESS_TOKEN_EXPIRE_MINUTES 30 REFRESH_TOKEN_EXPIRE_DAYS 7 def create_access_token(data: dict) - str: to_encode data.copy() expire datetime.utcnow() timedelta(minutesACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({exp: expire, type: access}) return jwt.encode(to_encode, SECRET_KEY, algorithmALGORITHM) def create_refresh_token(data: dict) - str: to_encode data.copy() expire datetime.utcnow() timedelta(daysREFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({exp: expire, type: refresh}) return jwt.encode(to_encode, SECRET_KEY, algorithmALGORITHM) def decode_token(token: str) - dict: try: payload jwt.decode(token, SECRET_KEY, algorithms[ALGORITHM]) return payload except JWTError: raise HTTPException(status_code401, detailToken无效或已过期) # ─── OAuth2第三方登录GitHub示例─── oauth OAuth() oauth.register( namegithub, client_idos.getenv(GITHUB_CLIENT_ID), client_secretos.getenv(GITHUB_CLIENT_SECRET), access_token_urlhttps://github.com/login/oauth/access_token, access_token_paramsNone, authorize_urlhttps://github.com/login/oauth/authorize, authorize_paramsNone, api_base_urlhttps://api.github.com/, client_kwargs{scope: user:email}, ) app.get(/auth/github/login) async def github_login(request: Request): redirect_uri request.url_for(github_callback) return await oauth.github.authorize_redirect(request, redirect_uri) app.get(/auth/github/callback) async def github_callback(request: Request): token await oauth.github.authorize_access_token(request) user_info await oauth.github.get(user, tokentoken) user_email user_info.json().get(email) # 查找或创建用户 user await db.get_user_by_email(user_email) if not user: user await db.create_user(emailuser_email, nameuser_info.json()[login]) # 签发JWT access_token create_access_token({sub: str(user.id), email: user.email}) refresh_token create_refresh_token({sub: str(user.id)}) # 重定向回前端把token放在URL fragment不会发到后端 frontend_url os.getenv(FRONTEND_URL, http://localhost:3000) return RedirectResponse( urlf{frontend_url}/auth/callback?access_token{access_token}refresh_token{refresh_token} ) # ─── 本地注册/登录 ─── from pydantic import BaseModel class RegisterRequest(BaseModel): email: str password: str name: str class LoginRequest(BaseModel): email: str password: str app.post(/auth/register) async def register(req: RegisterRequest): existing await db.get_user_by_email(req.email) if existing: raise HTTPException(400, 邮箱已注册) hashed hash_password(req.password) user await db.create_user(emailreq.email, namereq.name, password_hashhashed) access_token create_access_token({sub: str(user.id), email: user.email}) refresh_token create_refresh_token({sub: str(user.id)}) return { access_token: access_token, refresh_token: refresh_token, token_type: bearer, expires_in: ACCESS_TOKEN_EXPIRE_MINUTES * 60 } app.post(/auth/login) async def login(req: LoginRequest): user await db.get_user_by_email(req.email) if not user or not verify_password(req.password, user.password_hash): raise HTTPException(401, 邮箱或密码错误) access_token create_access_token({sub: str(user.id), email: user.email}) refresh_token create_refresh_token({sub: str(user.id)}) return { access_token: access_token, refresh_token: refresh_token, token_type: bearer, expires_in: ACCESS_TOKEN_EXPIRE_MINUTES * 60 } # ─── 刷新Token ─── class RefreshRequest(BaseModel): refresh_token: str app.post(/auth/refresh) async def refresh(req: RefreshRequest): payload decode_token(req.refresh_token) if payload.get(type) ! refresh: raise HTTPException(401, 无效的refresh token) new_access_token create_access_token({sub: payload[sub]}) return {access_token: new_access_token, token_type: bearer} # ─── 获取当前用户依赖注入─── from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials security HTTPBearer() async def get_current_user(cred: HTTPAuthorizationCredentials Depends(security)) - dict: payload decode_token(cred.credentials) user_id payload.get(sub) if not user_id: raise HTTPException(401, 无效的token) user await db.get_user_by_id(int(user_id)) if not user: raise HTTPException(401, 用户不存在) return user # ─── 受保护的接口 ─── app.get(/api/me) async def get_me(current_user: dict Depends(get_current_user)): return { id: current_user[id], email: current_user[email], name: current_user[name] } app.get(/api/protected) async def protected_route(current_user: dict Depends(get_current_user)): return {message: f你好{current_user[name]}}前端如何携带JWT// 登录后存储token const login async (email, password) { const resp await fetch(http://localhost:8000/auth/login, { method: POST, headers: { Content-Type: application/json }, body: JSON.stringify({ email, password }) }); const data await resp.json(); localStorage.setItem(access_token, data.access_token); localStorage.setItem(refresh_token, data.refresh_token); }; // 请求拦截器自动附加token const apiFetch async (url, options {}) { const token localStorage.getItem(access_token); const headers { ...options.headers, Authorization: Bearer ${token} }; let resp await fetch(url, { ...options, headers }); // Token过期尝试刷新 if (resp.status 401) { const refreshed await refreshToken(); if (refreshed) { const newToken localStorage.getItem(access_token); headers[Authorization] Bearer ${newToken}; resp await fetch(url, { ...options, headers }); } } return resp; }; const refreshToken async () { const refreshToken localStorage.getItem(refresh_token); const resp await fetch(http://localhost:8000/auth/refresh, { method: POST, headers: { Content-Type: application/json }, body: JSON.stringify({ refresh_token: refreshToken }) }); if (resp.ok) { const data await resp.json(); localStorage.setItem(access_token, data.access_token); return true; } // Refresh token也过期跳转登录 localStorage.clear(); window.location.href /login; return false; };PKCE为什么授权码模式需要它传统授权码模式有个漏洞授权码在回调URL中可能被恶意App截获Android/iOS的Intent劫持。PKCEProof Key for Code Exchange解决这个问题前端生成 code_verifier随机字符串 ↓ hash → code_challenge ↓ 把 code_challenge 发给授权服务器 授权服务器返回授权码 ↓ 前端带着 授权码 code_verifier 换token 服务器验证 hash(code_verifier) code_challenge ✅ 通过才发tokenMonkeyCode生成的PKCE实现import hashlib import base64 import secrets def generate_pkce_pair(): 生成code_verifier和code_challenge code_verifier base64.urlsafe_b64encode( secrets.token_bytes(32) ).decode().rstrip() code_challenge base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode()).digest() ).decode().rstrip() return code_verifier, code_challenge # 前端登录时 verifier, challenge generate_pkce_pair() session[pkce_verifier] verifier # 存在session redirect_url fhttps://auth-server/oauth/authorize?...code_challenge{challenge}code_challenge_methodS256 # 回调时 stored_verifier session.pop(pkce_verifier) # 用 verifier 换token服务器会验证安全最佳实践MonkeyCode自动检查让MonkeyCode检查我的认证代码的安全性列出所有风险点检查项风险修复JWT Secret硬编码Token可被伪造用环境变量长度≥32字符没用HTTPSToken被中间人截获生产环境强制HTTPSaccess token过期时间太长泄露后影响大≤30分钟用refresh token续期没做登出黑名单token被盗用无法撤销登出时把token加入黑名单Redis没限制登录尝试容易被暴力破解登录失败5次锁定15分钟密码强度不够弱密码被撞库注册时强制8位大小写数字