AIAgent

📅 2026/6/30 4:38:27
AIAgent
AIAgent 是一个 AI 代理类用于处理企业微信消息。from deepagents import create_deep_agent from langchain_mcp_adapters.tools import load_mcp_tools from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage from mcp.client.session import ClientSession from mcp.client.streamable_http import streamable_http_client from pkg.config import cfg from pkg.log import get_logger class AIAgent: logger get_logger(ai_agent) def __init__(self): self.model: ChatOpenAI None # type: ignore self._mcp_session: ClientSession None # type: ignore self._mcp_server_url fhttp://127.0.0.1:{cfg.service_port}/mcp/ async def start(self): if not self.model: self.model ChatOpenAI( base_urlcfg.agent_base_url, api_keycfg.agent_api_key, # type: ignore modelcfg.agent_model, ) async def shutdown(self): self.model None # type: ignore async def _create_root_agent(self, session: ClientSession): tools await load_mcp_tools(session) root_agent create_deep_agent( modelself.model, toolstools, system_promptf你是一个智能助手名字叫{cfg.qywx_bot_name}, 可以协助用户处理各种问题并用温和积极的语气回答问题。回答的格式应该符合markdown规范。, ) return root_agent async def astream(self, input: str, thread_id: str ): self.logger.debug(fConnecting to mcp server: {self._mcp_server_url}) async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id): async with ClientSession(read, write) as session: await session.initialize() root_agent await self._create_root_agent(session) async for chunk in root_agent.astream( input{messages: [HumanMessage(contentinput)]} ): # 从 chunk 字典中提取 AIMessage 的 content if isinstance(chunk, dict): messages chunk.get(model, {}).get(messages, []) if not messages: continue for msg in messages: if hasattr(msg, content) and msg.content: yield str(msg.content) elif hasattr(chunk, content): yield str(chunk.content) # type: ignore else: yield str(chunk) async def ainvoke(self, input: str, thread_id: str ): self.logger.debug(fConnecting to mcp server: {self._mcp_server_url}) async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id): async with ClientSession(read, write) as session: await session.initialize() root_agent await self._create_root_agent(session) resp await root_agent.ainvoke( input{messages: [HumanMessage(contentinput)]} ) return resp[messages][-1].contentmain# main.py from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import uvicorn from contextlib import asynccontextmanager from pkg.qywx import qywx_client from pkg.config import cfg from ai_agent.mcp_servers.datetime_server import mcp as datetime_mcp from ai_agent import aiops asynccontextmanager async def lifespan(app: FastAPI): await aiops.start() await qywx_client.start() # 先获取 MCP app这会创建 session_manager mcp_app datetime_mcp.streamable_http_app() # 在 FastAPI lifespan 中启动 MCP session manager async with datetime_mcp.session_manager.run(): # 挂载 MCP app app.mount(/mcp, mcp_app) yield await aiops.shutdown() await qywx_client.shutdown() app FastAPI( lifespanlifespan ) if __name__ __main__: uvicorn.run(main:app, hostcfg.service_host, portcfg.service_port)