当前位置: 首页> 科技> 能源 > 企业公司网页设计方案_广州建设厅官网_线上培训机构排名前十_广告公司名字

企业公司网页设计方案_广州建设厅官网_线上培训机构排名前十_广告公司名字

时间:2025/7/11 8:01:44来源:https://blog.csdn.net/xnuscd/article/details/144202967 浏览次数:1次
企业公司网页设计方案_广州建设厅官网_线上培训机构排名前十_广告公司名字

handler

当然,我将为您详细讲解您提供的Handler类代码。这将包括对导入部分的解释、类及其方法的详细说明,以及每个方法的参数和功能的深入分析。

导入部分

import json
import time
from queue import Queue
from typing import Dictfrom bisheng.api.utils import build_input_keys_response
from bisheng.api.v1.schemas import ChatMessage, ChatResponse
from bisheng.chat.manager import ChatManager
from bisheng.chat.utils import judge_source, process_graph, process_source_document
from bisheng.database.base import session_getter
from bisheng.database.models.report import Report
from bisheng.interface.importing.utils import import_by_type
from bisheng.interface.initialize.loading import instantiate_llm
from bisheng.settings import settings
from bisheng.utils.docx_temp import test_replace_string
from bisheng.utils.logger import logger
from bisheng.utils.minio_client import MinioClient
from bisheng.utils.threadpool import thread_pool
from bisheng.utils.util import get_cache_key
from bisheng_langchain.chains.autogen.auto_gen import AutoGenChain
from langchain.chains.llm import LLMChain
from langchain_core.prompts.prompt import PromptTemplate
from sqlmodel import select

标准库导入

  • json:用于处理JSON数据的序列化和反序列化。
  • time:用于记录和计算时间,尤其是性能监控。
  • queue.Queue:线程安全的队列,用于在多线程或多任务环境中传递数据。
  • typing.Dict:用于类型提示,表明变量的类型为字典。

自定义模块导入

  • bisheng.api.utils.build_input_keys_response:用于构建输入键的响应,具体功能取决于项目的实现。
  • bisheng.api.v1.schemas.ChatMessage, ChatResponse:定义聊天消息和响应的数据模型。
  • bisheng.chat.manager.ChatManager:管理聊天会话和连接的核心类。
  • bisheng.chat.utils.judge_source, process_graph, process_source_document:聊天处理的实用工具函数。
  • bisheng.database.base.session_getter:获取数据库会话的上下文管理器。
  • bisheng.database.models.report.Report:报告模型,用于存储和检索报告数据。
  • bisheng.interface.importing.utils.import_by_type:根据类型导入模块或类。
  • bisheng.interface.initialize.loading.instantiate_llm:实例化大型语言模型(LLM)。
  • bisheng.settings.settings:项目的配置和设置。
  • bisheng.utils.docx_temp.test_replace_string:用于处理和替换Word文档中的字符串。
  • bisheng.utils.logger.logger:日志记录工具,通常用于记录信息、警告和错误。
  • bisheng.utils.minio_client.MinioClient:与MinIO对象存储服务交互的客户端。
  • bisheng.utils.threadpool.thread_pool:线程池管理器,用于并发执行任务。
  • bisheng.utils.util.get_cache_key:生成缓存键的实用函数。
  • bisheng_langchain.chains.autogen.auto_gen.AutoGenChain:自动生成链的实现,可能用于复杂的对话生成。
  • langchain.chains.llm.LLMChain:LangChain库中的LLM链,用于处理与大型语言模型的交互。
  • langchain_core.prompts.prompt.PromptTemplate:用于定义提示模板,指导LLM生成特定类型的响应。
  • sqlmodel.select:用于构建SQL查询,特别是与SQLModel ORM一起使用。

Handler 类

Handler类负责处理不同类型的聊天操作,如消息处理、自动生成、文件处理、报告生成和停止操作。它通过一个处理器字典来分派不同的操作,并利用异步方法确保高效的处理。

类定义和初始化

class Handler:def __init__(self, stream_queue: Queue) -> None:self.handler_dict = {'default': self.process_message,'autogen': self.process_autogen,'auto_file': self.process_file,'report': self.process_report,'stop': self.process_stop}# 记录流式输出的内容self.stream_queue = stream_queue
说明
  • 参数:
    • stream_queue (Queue):用于存储和传递流式输出内容的队列。
  • 功能:
    • 初始化一个处理器字典handler_dict,将不同的操作类型(如'default''autogen'等)映射到相应的处理方法。
    • 将传入的stream_queue保存为实例变量,用于记录和管理流式输出内容。

dispatch_task 方法

    async def dispatch_task(self, session: ChatManager, client_id: str, chat_id: str, action: str,payload: dict, user_id):logger.info(f'dispatch_task payload={payload.get("inputs")}')start_time = time.time()with session.cache_manager.set_client_id(client_id, chat_id):if not action:action = 'default'if action not in self.handler_dict:raise Exception(f'unknown action {action}')if action != 'stop':# 清空流式输出队列,防止上次的回答污染本次回答while not self.stream_queue.empty():self.stream_queue.get()await self.handler_dict[action](session, client_id, chat_id, payload, user_id)logger.info(f'dispatch_task done timecost={time.time() - start_time}')return client_id, chat_id
参数
  • session (ChatManager):聊天管理器实例,用于管理聊天会话和连接。
  • client_id (str):客户端的唯一标识符。
  • chat_id (str):聊天会话的唯一标识符。
  • action (str):要执行的操作类型,对应handler_dict中的键。
  • payload (dict):包含操作所需的数据的负载。
  • user_id:用户的唯一标识符(类型未明确指定,通常为intstr)。
功能
  1. 日志记录:记录接收到的payloadinputs部分,便于调试和监控。
  2. 时间记录:记录任务开始时间,用于性能监控。
  3. 设置客户端上下文:使用session.cache_manager.set_client_id(client_id, chat_id)设置当前客户端和聊天会话的上下文,这通常用于缓存管理。
  4. 操作类型处理:
    • 如果action为空,则默认为'default'
    • 检查action是否在handler_dict中。如果不存在,抛出异常。
    • 如果action不是'stop',则清空stream_queue,防止上次的流式输出内容污染本次回答。
  5. 分派处理:根据action类型,从handler_dict中获取对应的处理方法,并调用它,传入必要的参数。
  6. 日志记录:记录任务完成时间,显示任务耗时。
  7. 返回:返回client_idchat_id,以便进一步处理或记录。

process_stop 方法

    async def process_stop(self, session: ChatManager, client_id: str, chat_id: str, payload: Dict,user_id):key = get_cache_key(client_id, chat_id)langchain_object = session.in_memory_cache.get(key)action = payload.get('action')if isinstance(langchain_object, AutoGenChain):if hasattr(langchain_object, 'stop'):logger.info('reciever_human_interactive langchain_objct')await langchain_object.stop()else:logger.error(f'act=auto_gen act={action}')else:# 普通技能的stopres = thread_pool.cancel_task([key])  # 将进行中的任务进行cancelif res[0]:# message = payload.get('inputs') or '手动停止'res = ChatResponse(type='end', user_id=user_id, message='')close = ChatResponse(type='close')await session.send_json(client_id, chat_id, res, add=False)await session.send_json(client_id, chat_id, close, add=False)answer = ''# 记录中止后产生的流式输出内容while not self.stream_queue.empty():answer += self.stream_queue.get()if answer.strip():chat_message = ChatMessage(message=answer,category='answer',type='end',user_id=user_id,remark='break_answer',is_bot=True)session.chat_history.add_message(client_id, chat_id, chat_message)logger.info('process_stop done')
参数
  • session (ChatManager):聊天管理器实例。
  • client_id (str):客户端的唯一标识符。
  • chat_id (str):聊天会话的唯一标识符。
  • payload (Dict):包含操作数据的字典,特别是action键。
  • user_id:用户的唯一标识符。
功能
  1. 生成缓存键:使用client_idchat_id生成唯一的缓存键key
  2. 获取LangChain对象:从in_memory_cache中获取与key关联的langchain_object
  3. 获取操作类型:从payload中获取action的值。
  4. 处理AutoGenChain的停止:
    • 检查langchain_object是否是AutoGenChain的实例。
    • 如果是,检查是否具有stop方法。
      • 如果有,记录日志并调用stop方法以停止自动生成链。
      • 如果没有,记录错误日志。
  5. 处理普通技能的停止:
    • 调用thread_pool.cancel_task([key])取消与key关联的任务。
    • 如果任务成功取消(res[0]True),则创建并发送ChatResponse消息,表示结束和关闭会话。
  6. 记录流式输出内容:
    • stream_queue中取出所有流式输出内容并拼接到answer字符串中。
    • 如果answer非空,创建一个ChatMessage对象,将其添加到聊天历史记录中。
  7. 日志记录:记录process_stop done,表示停止操作完成。

process_report 方法

    async def process_report(self,session: ChatManager,client_id: str,chat_id: str,payload: Dict,user_id=None):chat_inputs = payload.pop('inputs', {})chat_inputs.pop('data', '')chat_inputs.pop('id', '')key = get_cache_key(client_id, chat_id)artifacts = session.in_memory_cache.get(key + '_artifacts')if artifacts:for k, value in artifacts.items():if k in chat_inputs:chat_inputs[k] = valuechat_message = ChatMessage(message=chat_inputs,category='question',type='bot',user_id=user_id)session.chat_history.add_message(client_id, chat_id, chat_message)# process messagelangchain_object = session.in_memory_cache.get(key)chat_inputs = {'inputs': chat_inputs, 'is_begin': False}result = await self.process_message(session, client_id, chat_id, chat_inputs, user_id)# judge end typestart_resp = ChatResponse(type='start', user_id=user_id)await session.send_json(client_id, chat_id, start_resp)if langchain_object.stop_status():start_resp.category = 'divider'response = ChatResponse(message='主动退出',type='end',category='divider',user_id=user_id)await session.send_json(client_id, chat_id, response)# build reportwith session_getter() as db_session:template = db_session.exec(select(Report).where(Report.flow_id == client_id).order_by(Report.id.desc())).first()if not template:logger.error('template not support')returnminio_client = MinioClient()template_muban = minio_client.get_share_link(template.object_name)report_name = langchain_object.report_namereport_name = report_name if report_name.endswith('.docx') else f'{report_name}.docx'test_replace_string(template_muban, result, report_name)file = minio_client.get_share_link(report_name)response = ChatResponse(type='end',files=[{'file_url': file,'file_name': report_name}],user_id=user_id)await session.send_json(client_id, chat_id, response)close_resp = ChatResponse(type='close', category='system', user_id=user_id)await session.send_json(client_id, chat_id, close_resp)
参数
  • session (ChatManager):聊天管理器实例。
  • client_id (str):客户端的唯一标识符。
  • chat_id (str):聊天会话的唯一标识符。
  • payload (Dict):包含操作数据的字典。
  • user_id:用户的唯一标识符(默认为None)。
功能
  1. 处理输入:
    • payload中移除并获取inputs,默认为空字典。
    • 移除inputs中的'data''id'键。
  2. 生成缓存键:使用client_idchat_id生成唯一的缓存键key
  3. 获取和更新输入数据:
    • in_memory_cache中获取与key + '_artifacts'关联的artifacts
    • 如果artifacts存在,将其中的值更新到chat_inputs中对应的键。
  4. 记录聊天消息:
    • 创建一个ChatMessage对象,类别为'question',类型为'bot',并将其添加到聊天历史记录中。
  5. 处理消息:
    • in_memory_cache中获取与key关联的langchain_object
    • 构建新的chat_inputs字典,包含inputsis_begin=False
    • 调用process_message方法处理消息,获取结果。
  6. 发送开始响应:创建并发送一个ChatResponse对象,类型为'start'
  7. 判断是否停止:
    • 调用langchain_object.stop_status()方法判断是否需要停止。
    • 如果需要,更新响应类别为'divider',并发送一个结束响应,表示主动退出。
  8. 构建报告:
    • 使用数据库会话查询与client_id关联的最新Report模板。
    • 如果未找到模板,记录错误并返回。
    • 使用MinioClient获取模板的共享链接。
    • 确保report_name.docx结尾。
    • 调用test_replace_string方法将结果替换到模板中,生成报告。
    • 获取生成的报告的共享链接。
    • 创建并发送一个包含报告文件的ChatResponse对象。
  9. 发送关闭响应:创建并发送一个ChatResponse对象,类型为'close',表示会话结束。

recommend_question 方法

    def recommend_question(self, langchain_obj, chat_history: list):prompt = """给定以下历史聊天消息:{history}总结提炼用户可能接下来会提问的3个问题,请直接输出问题,使用换行符分割问题,不要添加任何修饰文字或前后缀。"""if hasattr(langchain_obj, 'llm'):llm_chain = LLMChain(llm=langchain_obj.llm,prompt=PromptTemplate.from_template(prompt))else:keyword_conf = settings.get_default_llm() or {}if keyword_conf:node_type = keyword_conf.pop('type', 'HostQwenChat')  # 兼容旧配置class_object = import_by_type(_type='llms', name=node_type)llm = instantiate_llm(node_type, class_object, keyword_conf)llm_chain = LLMChain(llm=llm, prompt=PromptTemplate.from_template(prompt))if llm_chain:questions = llm_chain.predict(history=chat_history)return questions.split('\n')else:logger.info('llm_chain is None recommend_over')return []
参数
  • langchain_obj:LangChain对象,通常包含一个大型语言模型(LLM)。
  • chat_history (list):历史聊天消息的列表,用于生成推荐问题。
功能
  1. 定义提示模板:
    • 提示LLM根据历史聊天消息总结用户可能会提出的三个问题,要求直接输出问题,使用换行符分割,不添加任何修饰文字或前后缀。
  2. 初始化LLM链:
    • 如果langchain_obj具有llm属性,则直接使用该LLM实例化LLMChain,并应用提示模板。
    • 如果没有llm属性,则从settings中获取默认的LLM配置:
      • 从配置中获取node_type,默认为'HostQwenChat',以兼容旧配置。
      • 使用import_by_type函数导入对应类型的LLM类。
      • 使用instantiate_llm函数实例化LLM对象。
      • 使用实例化的LLM和提示模板创建LLMChain
  3. 生成推荐问题:
    • 如果成功创建了LLMChain,则调用llm_chain.predict(history=chat_history)生成推荐问题。
    • 将生成的字符串按换行符分割,返回问题列表。
    • 如果未成功创建LLMChain,记录日志并返回空列表。

process_message 方法

    async def process_message(self,session: ChatManager,client_id: str,chat_id: str,payload: Dict,user_id=None):# Process the graph data and chat messagechat_inputs = payload.pop('inputs', {})chat_inputs.pop('id', '')is_begin = payload.get('is_begin', True)key = get_cache_key(client_id, chat_id)artifacts = session.in_memory_cache.get(key + '_artifacts')if artifacts:for k, value in artifacts.items():if k in chat_inputs and value:chat_inputs[k] = valuechat_inputs = ChatMessage(message=chat_inputs,category='question',is_bot=not is_begin,type='bot',user_id=user_id,)if is_begin:# 从file auto trigger process_message, the question already savedsession.chat_history.add_message(client_id, chat_id, chat_inputs)start_resp = ChatResponse(type='start', user_id=user_id)await session.send_json(client_id, chat_id, start_resp)# is_first_message = len(self.chat_history.get_history(client_id=client_id)) <= 1# Generate result and thoughttry:logger.debug(f'Generating result and thought key={key}')langchain_object = session.in_memory_cache.get(key)result, intermediate_steps, source_doucment = await process_graph(langchain_object=langchain_object,chat_inputs=chat_inputs,websocket=session.active_connections[get_cache_key(client_id, chat_id)],flow_id=client_id,chat_id=chat_id,stream_queue=self.stream_queue,)# questions = []# if is_begin and langchain_object.memory and langchain_object.memory.buffer:#     questions = self.recommend_question(langchain_object,#                                         langchain_object.memory.buffer)except Exception as e:# Log stack tracelogger.exception(e)end_resp = ChatResponse(type='end',intermediate_steps=f'分析出错,{str(e)}',user_id=user_id)await session.send_json(client_id, chat_id, end_resp)close_resp = ChatResponse(type='close', user_id=user_id)if not chat_id:# 技能编排页面, 无法展示intermediateawait session.send_json(client_id, chat_id, start_resp)end_resp.message = end_resp.intermediate_stepsend_resp.intermediate_steps = Noneawait session.send_json(client_id, chat_id, end_resp)await session.send_json(client_id, chat_id, close_resp)return# Send a response back to the frontend, if neededintermediate_steps = intermediate_steps or ''# history = self.chat_history.get_history(client_id, chat_id, filter_messages=False)await self.intermediate_logs(session, client_id, chat_id, user_id, intermediate_steps)extra = {}source, result = await judge_source(result, source_doucment, chat_id, extra)# 最终结果if isinstance(langchain_object, AutoGenChain):# 群聊,最后一条消息重复,不进行返回start_resp.category = 'divider'await session.send_json(client_id, chat_id, start_resp)response = ChatResponse(message='本轮结束',type='end',category='divider',user_id=user_id)await session.send_json(client_id, chat_id, response)else:# 正常if is_begin:start_resp.category = 'answer'await session.send_json(client_id, chat_id, start_resp)response = ChatResponse(message=result,extra=json.dumps(extra),type='end',category='answer',user_id=user_id,source=int(source))await session.send_json(client_id, chat_id, response)# 循环结束if is_begin:close_resp = ChatResponse(type='close', user_id=user_id)await session.send_json(client_id, chat_id, close_resp)if source:# 处理召回的chunkawait process_source_document(source_doucment,chat_id,response.message_id,result,)return result
参数
  • session (ChatManager):聊天管理器实例。
  • client_id (str):客户端的唯一标识符。
  • chat_id (str):聊天会话的唯一标识符。
  • payload (Dict):包含操作数据的字典,尤其是inputsis_begin键。
  • user_id:用户的唯一标识符(默认为None)。
功能
  1. 处理输入数据:
    • payload中移除并获取inputs,默认为空字典。
    • 移除inputs中的'id'键。
    • 获取is_begin标志,默认为True
  2. 生成缓存键:使用client_idchat_id生成唯一的缓存键key
  3. 获取和更新输入数据:
    • in_memory_cache中获取与key + '_artifacts'关联的artifacts
    • 如果artifacts存在,将其中的值更新到chat_inputs中对应的键。
  4. 记录聊天消息:
    • 创建一个ChatMessage对象,类别为'question',类型为'bot'(如果is_beginFalse,否则为'user'),并将其添加到聊天历史记录中(如果is_beginTrue)。
  5. 发送开始响应:创建并发送一个ChatResponse对象,类型为'start',用于指示聊天开始。
  6. 处理消息:
    • 尝试调用process_graph方法处理图数据和聊天消息,获取结果、步骤和源文档。
    • 记录生成结果和思考过程的日志。
  7. 异常处理:
    • 如果在处理过程中发生异常,记录异常信息,发送错误响应,并关闭会话。
    • 特别处理技能编排页面无法展示intermediate的情况。
  8. 记录中间日志:
    • 调用intermediate_logs方法记录中间步骤的日志。
  9. 判断来源和结果处理:
    • 调用judge_source方法判断结果的来源,并可能更新extra字典。
  10. 发送最终结果:
    • 如果langchain_objectAutoGenChain的实例,发送一个分隔符和结束响应,表示本轮对话结束。
    • 否则,发送正常的答案响应,包括结果、额外信息和来源。
  11. 关闭会话:如果is_beginTrue,发送关闭响应。
  12. 处理召回的文档块:如果存在source,调用process_source_document方法处理召回的文档块。
  13. 返回结果:返回处理后的结果。

process_file 方法

    async def process_file(self, session: ChatManager, client_id: str, chat_id: str, payload: dict,user_id: int):file_name = payload['inputs']batch_question = payload['inputs']['questions']# 如果L3file = ChatMessage(is_bot=False, message=file_name, type='end', user_id=user_id)session.chat_history.add_message(client_id, chat_id, file)start_resp = ChatResponse(type='start', category='system', user_id=user_id)key = get_cache_key(client_id, chat_id)langchain_object = session.in_memory_cache.get(key)if batch_question and len(langchain_object.input_keys) == 0:# prompt 没有可以输入问题的地方await session.send_json(client_id, chat_id, start_resp)log_resp = start_resp.copy()log_resp.intermediate_steps = '当前Prompt设置无用户输入,PresetQuestion 不生效'log_resp.type = 'end'await session.send_json(client_id, chat_id, log_resp)input_key = 'input'input_dict = {}else:input_key = list(build_input_keys_response(langchain_object,{})['input_keys'].keys())[0]input_dict = {k: '' for k in langchain_object.input_keys}batch_question = ['start'] if not batch_question else batch_question  # 确保点击确定,会执行LLMreport = ''logger.info(f'process_file batch_question={batch_question} input_key={input_key}')for question in batch_question:if not question:continueinput_dict[input_key] = questionpayload = {'inputs': input_dict, 'is_begin': False}start_resp.category == 'question'await session.send_json(client_id, chat_id, start_resp)step_resp = ChatResponse(type='end',intermediate_steps=question,category='question',user_id=user_id)await session.send_json(client_id, chat_id, step_resp)result = await self.process_message(session, client_id, chat_id, payload, user_id)response_step = ChatResponse(intermediate_steps=result,type='start',category='answer',user_id=user_id)response_step.type = 'end'await session.send_json(client_id, chat_id, response_step)report = f"""{report}### {question} \n {result} \n """if len(batch_question) > 1:start_resp.category = 'report'await session.send_json(client_id, chat_id, start_resp)response = ChatResponse(type='end',intermediate_steps=report,category='report',user_id=user_id)await session.send_json(client_id, chat_id, response)close_resp = ChatResponse(type='close', category='system', user_id=user_id)await session.send_json(client_id, chat_id, close_resp)
参数
  • session (ChatManager):聊天管理器实例。
  • client_id (str):客户端的唯一标识符。
  • chat_id (str):聊天会话的唯一标识符。
  • payload (dict):包含文件处理数据的字典。
  • user_id (int):用户的唯一标识符。
功能
  1. 获取输入数据:
    • payload中获取inputs,其中包括file_namebatch_question(批量问题)。
  2. 记录文件消息:
    • 创建一个ChatMessage对象,类型为'end',并将其添加到聊天历史记录中。
  3. 发送开始响应:创建并发送一个ChatResponse对象,类型为'start',类别为'system'
  4. 获取缓存键和LangChain对象:
    • 使用client_idchat_id生成缓存键key
    • in_memory_cache中获取与key关联的langchain_object
  5. 处理批量问题:
    • 如果存在batch_question且langchain_object.input_keys为空,说明Prompt没有可以输入问题的地方。
      • 发送开始响应。
      • 创建并发送一个结束响应,包含警告信息,表示PresetQuestion不生效。
      • 设置input_key'input',并初始化input_dict为空字典。
    • 否则,从langchain_object中构建输入键,并初始化input_dict
  6. 确保有批量问题:如果batch_question为空,则将其设置为['start'],以确保LLM执行。
  7. 处理每个问题:
    • 遍历batch_question列表。
    • 对于每个非空问题:
      • 将问题设置到input_dict中对应的input_key
      • 构建新的payload,设置is_begin=False
      • 发送开始响应,类别为'question'
      • 创建并发送一个结束响应,包含当前问题。
      • 调用process_message方法处理消息,获取结果。
      • 创建并发送一个结束响应,包含结果,类别为'answer'
      • 将问题和结果添加到report字符串中,用于生成报告。
  8. 生成和发送报告:
    • 如果batch_question包含多个问题,则:
      • 更新开始响应类别为'report',并发送。
      • 创建并发送一个结束响应,包含累积的报告内容。
  9. 发送关闭响应:创建并发送一个ChatResponse对象,类型为'close',表示会话结束。

process_autogen 方法

    async def process_autogen(self, session: ChatManager, client_id: str, chat_id: str,payload: dict, user_id: int):key = get_cache_key(client_id, chat_id)langchain_object = session.in_memory_cache.get(key)logger.info(f'reciever_human_interactive langchain={langchain_object}')action = payload.get('action')if action.lower() == 'continue':# autgen_user 对话的时候,进程 wait() 需要换新if hasattr(langchain_object, 'input'):await langchain_object.input(payload.get('inputs'))# 新的对话开始,start_resp = ChatResponse(type='start')await session.send_json(client_id, chat_id, start_resp)else:logger.error(f'act=auto_gen act={action}')
参数
  • session (ChatManager):聊天管理器实例。
  • client_id (str):客户端的唯一标识符。
  • chat_id (str):聊天会话的唯一标识符。
  • payload (dict):包含自动生成操作数据的字典。
  • user_id (int):用户的唯一标识符。
功能
  1. 生成缓存键和获取LangChain对象:
    • 使用client_idchat_id生成唯一的缓存键key
    • in_memory_cache中获取与key关联的langchain_object
  2. 日志记录:记录接收到的langchain_object,便于调试。
  3. 获取操作类型:从payload中获取action的值。
  4. 处理继续操作:
    • 如果action为’continue’(不区分大小写):
      • 检查langchain_object是否具有input方法。
        • 如果有,调用langchain_object.input(payload.get('inputs')),传入输入数据。
        • 发送一个ChatResponse对象,类型为'start',表示新的对话开始。
        • 如果没有input方法,记录错误日志,说明AutoGenChain对象不支持继续操作。

intermediate_logs 方法

    async def intermediate_logs(self, session: ChatManager, client_id, chat_id, user_id,intermediate_steps):end_resp = ChatResponse(type='end', user_id=user_id)if not intermediate_steps:return await session.send_json(client_id, chat_id, end_resp, add=False)# 将最终的分析过程存数据库steps = []if isinstance(intermediate_steps, list):# autogen produce multi dialogfor message in intermediate_steps:# autogen produce message objectif isinstance(message, str):log = messageis_bot = Truecategory = 'processing'content = sender = receiver = Noneelse:content = message.get('message')log = message.get('log', '')sender = message.get('sender')receiver = message.get('receiver')is_bot = False if receiver and receiver.get('is_bot') else Truecategory = message.get('category', 'processing')msg = ChatResponse(message=content,intermediate_steps=log,sender=sender,receiver=receiver,type='end',user_id=user_id,is_bot=is_bot,category=category)steps.append(msg)else:# agent model will produce the steps logfrom langchain.schema import Document  # noqaif chat_id and intermediate_steps.strip():finally_log = ''for s in intermediate_steps.split('\n'):# 清理召回日志中的一些冗余日志if 'source_documents' in s:answer = eval(s.split(':', 1)[1])if 'result' in answer:finally_log += 'Answer: ' + answer.get('result') + '\n\n'else:finally_log += s + '\n\n'msg = ChatResponse(intermediate_steps=finally_log, type='end', user_id=user_id)steps.append(msg)else:# 只有L3用户给出详细的logend_resp.intermediate_steps = intermediate_stepsawait session.send_json(client_id, chat_id, end_resp, add=False)for step in steps:# save chate messagesession.chat_history.add_message(client_id, chat_id, step)
参数
  • session (ChatManager):聊天管理器实例。
  • client_id (str):客户端的唯一标识符。
  • chat_id (str):聊天会话的唯一标识符。
  • user_id:用户的唯一标识符。
  • intermediate_steps:中间步骤的日志或消息,可能是字符串或列表。
功能
  1. 创建结束响应:创建一个ChatResponse对象,类型为'end',用于结束当前会话。
  2. 检查中间步骤:
    • 如果intermediate_steps为空或None,直接发送结束响应并返回。
  3. 处理中间步骤日志:
    • 如果intermediate_steps是列表:
      • 遍历每个消息。
      • 如果消息是字符串,设置日志信息,类别为'processing',并创建一个ChatResponse对象,添加到steps列表。
      • 如果消息是字典,提取messagelogsenderreceiver信息,判断是否为机器人消息,设置类别,创建ChatResponse对象,并添加到steps列表。
    • 如果intermediate_steps是字符串:
      • 导入Document类(此行注释# noqa表示忽略代码检查器的警告)。
      • 如果chat_id存在且intermediate_steps不为空:
        • 遍历每一行日志,清理召回日志中的冗余信息:
          • 如果行中包含'source_documents',提取result并添加到最终日志中。
          • 否则,将行内容直接添加到最终日志中。
        • 创建一个ChatResponse对象,包含清理后的日志信息,添加到steps列表。
      • 否则(例如,L3用户给出详细的日志),将intermediate_steps直接设置到end_respintermediate_steps属性。
  4. 发送结束响应:发送end_resp消息,不将其添加到聊天历史记录中。
  5. 保存聊天消息:遍历steps列表,将每个ChatResponse对象添加到聊天历史记录中。

总结

Handler 类的职责

Handler类在整个聊天系统中扮演了关键角色,负责处理不同类型的操作(如消息处理、自动生成、文件处理、报告生成和停止操作)。通过以下方式实现其职责:

  1. 操作分派:使用handler_dict将不同的操作类型映射到相应的方法,确保不同的操作由适当的处理器处理。
  2. 流式输出管理:使用stream_queue存储和管理流式输出内容,确保输出内容的有序和一致性。
  3. 异步处理:所有处理方法都是异步的,利用asyncio实现高效的I/O操作和任务调度。
  4. 错误处理:在各个方法中都包含了异常处理逻辑,确保在发生错误时能够记录日志并适当响应用户。
  5. 缓存和数据库交互:通过session对象与缓存和数据库进行交互,管理会话数据和持久化存储。
  6. 日志记录:使用logger记录重要的操作和错误信息,便于调试和监控系统运行状态。

关键方法

  • dispatch_task:根据操作类型分派任务,调用相应的处理方法。
  • process_stop:处理停止操作,支持不同类型的LangChain对象,确保任务的正确取消和响应。
  • process_report:生成报告,整合聊天消息和处理结果,并通过MinIO存储和分享报告文件。
  • recommend_question:基于历史聊天记录,利用LLM生成推荐问题,增强用户体验。
  • process_message:处理普通聊天消息,调用process_graph生成响应,记录中间日志,并发送最终结果。
  • process_file:处理文件上传,支持批量问题处理和报告生成。
  • process_autogen:处理自动生成链的输入,支持继续操作。
  • intermediate_logs:记录和存储中间步骤的日志,确保详细的过程记录。

设计模式和架构

  • 观察者模式:通过handler_dict实现操作类型的观察者模式,根据事件类型调用不同的处理器。
  • 异步编程:利用asyncio实现非阻塞的I/O操作,提高系统的响应速度和并发处理能力。
  • 缓存管理:使用in_memory_cachecache_manager管理会话数据,提升数据访问效率。
  • 线程池:通过thread_pool处理并发任务,确保长时间运行的任务不会阻塞主线程。
  • 日志记录:系统广泛使用日志记录,确保操作的可追溯性和问题的可调试性。

与其他组件的交互

  • ChatManagerHandler类通过session对象与ChatManager交互,管理聊天会话和连接。
  • 数据库:通过session_getter获取数据库会话,查询和存储报告模板等数据。
  • MinIO:使用MinioClient与MinIO对象存储服务交互,存储和获取报告文件。
  • LangChain:集成了LangChain库的AutoGenChainLLMChain,用于生成和处理复杂的对话逻辑。
  • 缓存:利用in_memory_cache存储和检索会话相关的数据,确保高效的数据访问。

client.py

当然,我将为您详细讲解您提供的client.py代码。这将包括对导入部分的解释、ChatClient类及其方法的详细说明,以及每个方法的参数和功能的深入分析。

导入部分

import json
from typing import Dict, Callable
from uuid import UUID, uuid4
from queue import Queuefrom loguru import logger
from langchain_core.messages import AIMessage, HumanMessage
from fastapi import WebSocket, status, Requestfrom bisheng.api.services.assistant_agent import AssistantAgent
from bisheng.api.services.audit_log import AuditLogService
from bisheng.api.services.user_service import UserPayload
from bisheng.api.v1.callback import AsyncGptsDebugCallbackHandler
from bisheng.api.v1.schemas import ChatMessage, ChatResponse
from bisheng.chat.types import IgnoreException, WorkType
from bisheng.database.models.assistant import AssistantDao, AssistantStatus
from bisheng.database.models.message import ChatMessage as ChatMessageModel
from bisheng.database.models.message import ChatMessageDao
from bisheng.settings import settings
from bisheng.api.utils import get_request_ip
from bisheng.utils.threadpool import ThreadPoolManager, thread_pool

标准库导入

  • json: 用于处理JSON数据的序列化和反序列化。
  • typing.Dict, Callable: 用于类型提示,Dict表示字典类型,Callable表示可调用对象(如函数)。
  • uuid.UUID, uuid4: 用于生成和处理UUID(通用唯一标识符)。
  • queue.Queue: 线程安全的队列,用于在多线程或多任务环境中传递数据。

第三方库导入

  • loguru.logger: 一个用于日志记录的库,比内置的logging模块更简洁和强大。
  • langchain_core.messages.AIMessage, HumanMessage: 来自LangChain库,用于表示AI生成的消息和人类发送的消息。
  • fastapi.WebSocket, status, Request: FastAPI框架中的WebSocket类、状态码和请求对象,用于处理WebSocket连接和HTTP请求。

自定义模块导入

  • bisheng.api.services.assistant_agent.AssistantAgent: 自定义的助手代理服务,用于处理与AI助手的交互。
  • bisheng.api.services.audit_log.AuditLogService: 审计日志服务,用于记录和管理审计日志。
  • bisheng.api.services.user_service.UserPayload: 用户信息载荷,包含用户相关的数据。
  • bisheng.api.v1.callback.AsyncGptsDebugCallbackHandler: 异步GPT调试回调处理器,用于处理GPT生成的调试信息。
  • bisheng.api.v1.schemas.ChatMessage, ChatResponse: 定义聊天消息和响应的数据模型。
  • bisheng.chat.types.IgnoreException, WorkType: 自定义类型,IgnoreException用于特定的异常处理,WorkType用于区分不同的工作类型。
  • bisheng.database.models.assistant.AssistantDao, AssistantStatus: 助手数据访问对象和助手状态模型,用于与数据库中的助手数据交互。
  • bisheng.database.models.message.ChatMessage as ChatMessageModel, ChatMessageDao: 聊天消息模型和数据访问对象,用于与数据库中的聊天消息数据交互。
  • bisheng.settings.settings: 项目的配置和设置。
  • bisheng.api.utils.get_request_ip: 获取请求的IP地址的工具函数。
  • bisheng.utils.threadpool.ThreadPoolManager, thread_pool: 线程池管理器和线程池实例,用于并发执行任务。

ChatClient 类

ChatClient类负责管理单个聊天会话,包括处理接收到的消息、与助手代理交互、记录聊天历史、发送响应等。该类通过异步方法确保高效的处理,并利用线程池进行并发任务处理。

类定义和初始化

class ChatClient:def __init__(self, request: Request, client_key: str, client_id: str, chat_id: str, user_id: int,login_user: UserPayload, work_type: WorkType, websocket: WebSocket, **kwargs):self.request = requestself.client_key = client_keyself.client_id = client_idself.chat_id = chat_idself.user_id = user_idself.login_user = login_userself.work_type = work_typeself.websocket = websocketself.kwargs = kwargs# 业务自定义参数self.gpts_agent: AssistantAgent | None = Noneself.gpts_async_callback = Noneself.chat_history = []# 和模型对话时传入的 完整的历史对话轮数self.latest_history_num = 5self.gpts_conf = settings.get_from_db('gpts')# 异步任务列表self.task_ids = []# 流式输出的队列,用来接受流式输出的内容,每次处理新的question时都清空self.stream_queue = Queue()
参数
  • request (Request): 原始的HTTP请求对象,用于获取请求相关的信息,如IP地址等。
  • client_key (str): 客户端的唯一键,用于标识不同的客户端会话。
  • client_id (str): 客户端的唯一标识符,用于标识不同的客户端会话。
  • chat_id (str): 聊天会话的唯一标识符,用于标识不同的聊天会话。
  • user_id (int): 用户的唯一标识符,用于标识不同的用户。
  • login_user (UserPayload): 登录用户的信息载荷,包含用户相关的数据。
  • work_type (WorkType): 工作类型,用于区分不同的业务逻辑。
  • websocket (WebSocket): WebSocket连接对象,用于与客户端进行实时通信。
  • **kwargs: 其他可选的关键字参数。
属性
  • self.request: 保存原始的HTTP请求对象。
  • self.client_key: 保存客户端的唯一键。
  • self.client_id: 保存客户端的唯一标识符。
  • self.chat_id: 保存聊天会话的唯一标识符。
  • self.user_id: 保存用户的唯一标识符。
  • self.login_user: 保存登录用户的信息载荷。
  • self.work_type: 保存工作类型,用于区分不同的业务逻辑。
  • self.websocket: 保存WebSocket连接对象,用于与客户端进行实时通信。
  • self.kwargs: 保存其他可选的关键字参数。
  • self.gpts_agent: 初始化为None,后续用于存储助手代理对象(AssistantAgent)。
  • self.gpts_async_callback: 初始化为None,用于存储异步回调处理器。
  • self.chat_history: 初始化为空列表,用于记录聊天历史消息。
  • self.latest_history_num: 设置为5,用于限制历史聊天记录的轮数。
  • self.gpts_conf: 从配置中获取GPT相关的配置。
  • self.task_ids: 初始化为空列表,用于存储异步任务的ID。
  • self.stream_queue: 初始化为一个新的Queue对象,用于存储流式输出内容。

方法详解

send_message 方法
async def send_message(self, message: str):await self.websocket.send_text(message)
功能

通过WebSocket向客户端发送纯文本消息。

参数
  • message (str): 要发送的文本消息内容。
说明

该方法简单地使用self.websocket.send_text方法将文本消息发送给客户端。

send_json 方法
async def send_json(self, message: ChatMessage):await self.websocket.send_json(message.dict())
功能

通过WebSocket向客户端发送JSON格式的消息。

参数
  • message (ChatMessage): 要发送的ChatMessage对象。
说明

该方法将ChatMessage对象转换为字典,然后使用self.websocket.send_json方法将其作为JSON消息发送给客户端。

handle_message 方法
async def handle_message(self, message: Dict[any, any]):trace_id = uuid4().hexlogger.info(f'client_id={self.client_key} trace_id={trace_id} message={message}')with logger.contextualize(trace_id=trace_id):# 处理客户端发过来的信息, 提交到线程池内执行if self.work_type == WorkType.GPTS:thread_pool.submit(trace_id,self.wrapper_task,trace_id,self.handle_gpts_message,message,trace_id=trace_id)# await self.handle_gpts_message(message)
功能

处理从客户端接收到的消息,并将其提交到线程池中异步执行。

参数
  • message (Dict[any, any]): 客户端发送的消息数据,通常是一个字典。
说明
  1. 生成跟踪ID:使用uuid4().hex生成一个唯一的跟踪IDtrace_id,用于日志记录和追踪。
  2. 记录日志:记录接收到的消息,包括client_idtrace_id
  3. 上下文化日志:使用logger.contextualizetrace_id添加到日志上下文中,确保后续日志都包含该trace_id
  4. 提交任务到线程池:
    • 检查工作类型是否为WorkType.GPTS
    • 如果是,将handle_gpts_message方法封装在wrapper_task中,并提交到线程池执行。
    • 注释掉的await self.handle_gpts_message(message)表示在不使用线程池时可以直接异步执行该方法。
wrapper_task 方法
async def wrapper_task(self, task_id: str, fn: Callable, *args, **kwargs):# 包装处理函数为异步任务self.task_ids.append(task_id)try:# 执行处理函数await fn(*args, **kwargs)finally:# 执行完成后将任务id从列表移除self.task_ids.remove(task_id)
功能

包装处理函数为异步任务,并管理任务的ID。

参数
  • task_id (str): 任务的唯一标识符,用于跟踪和管理。
  • fn (Callable): 要执行的处理函数。
  • *args, **kwargs: 传递给处理函数的参数。
说明
  1. 添加任务ID:将task_id添加到self.task_ids列表中,以便跟踪当前正在运行的任务。
  2. 执行处理函数:异步执行传入的处理函数fn,并传递相应的参数。
  3. 移除任务ID:在处理函数执行完成后,无论是否成功,都将task_idself.task_ids列表中移除,确保任务列表的准确性。
add_message 方法
async def add_message(self, msg_type: str, message: str, category: str, remark: str = ''):self.chat_history.append({'category': category,'message': message,'remark': remark})if not self.chat_id:# debug模式无需保存历史returnis_bot = 0 if msg_type == 'human' else 1msg = ChatMessageDao.insert_one(ChatMessageModel(is_bot=is_bot,source=0,message=message,category=category,type=msg_type,extra=json.dumps({'client_key': self.client_key}, ensure_ascii=False),flow_id=self.client_id,chat_id=self.chat_id,user_id=self.user_id,remark=remark,))# 记录审计日志, 是新建会话if len(self.chat_history) <= 1:AuditLogService.create_chat_assistant(self.login_user, get_request_ip(self.request), self.client_id)return msg
功能

将消息添加到聊天历史记录中,并在必要时保存到数据库,同时记录审计日志。

参数
  • msg_type (str): 消息类型,通常为'human''bot'
  • message (str): 消息内容。
  • category (str): 消息类别,用于分类和过滤。
  • remark (str): 备注信息,默认为空字符串。
说明
  1. 更新本地聊天历史:将消息添加到self.chat_history列表中,记录消息的类别、内容和备注。
  2. 检查聊天ID:
    • 如果self.chat_id为空,表示处于调试模式,无需保存历史记录,直接返回。
  3. 确定消息是否来自机器人:
    • 如果msg_type'human',则is_bot设置为0
    • 否则,设置为1,表示消息来自机器人。
  4. 创建并保存消息到数据库:
    • 使用ChatMessageModel创建一个新的消息模型对象,填充相关字段,包括消息类型、内容、类别、用户ID等。
    • 使用ChatMessageDao.insert_one方法将消息插入到数据库中,并返回保存后的消息对象。
  5. 记录审计日志:
    • 如果聊天历史记录的长度小于等于1,表示是新建会话。
    • 调用AuditLogService.create_chat_assistant记录审计日志,记录登录用户、请求IP和客户端ID。
  6. 返回消息对象:返回保存后的消息对象。
send_response 方法
async def send_response(self, category: str, msg_type: str, message: str, intermediate_steps: str = '',message_id: int = None):is_bot = 0 if msg_type == 'human' else 1await self.send_json(ChatResponse(message_id=message_id,category=category,type=msg_type,is_bot=is_bot,message=message,user_id=self.user_id,flow_id=self.client_id,chat_id=self.chat_id,extra=json.dumps({'client_key': self.client_key}, ensure_ascii=False),intermediate_steps=intermediate_steps,))
功能

通过WebSocket发送格式化的ChatResponse消息给客户端。

参数
  • category (str): 消息类别,用于分类和过滤。
  • msg_type (str): 消息类型,通常为'human''bot'
  • message (str): 消息内容。
  • intermediate_steps (str): 中间步骤的日志或信息,默认为空字符串。
  • message_id (int): 消息的唯一标识符,默认为None
说明
  1. 确定消息是否来自机器人:
    • 如果msg_type'human',则is_bot设置为0
    • 否则,设置为1,表示消息来自机器人。
  2. 创建ChatResponse对象:填充相关字段,包括消息ID、类别、类型、是否来自机器人、消息内容、用户ID、客户端ID、聊天ID、额外信息和中间步骤。
  3. 发送JSON消息:调用self.send_json方法将ChatResponse对象发送给客户端。
init_gpts_agent 方法
async def init_gpts_agent(self):await self.init_chat_history()await self.init_gpts_callback()try:# 处理智能助手业务if self.chat_id and self.gpts_agent is None:# 会话业务agent通过数据库数据固定生成,不用每次变化assistant = AssistantDao.get_one_assistant(UUID(self.client_id))if not assistant:raise IgnoreException('该助手已被删除')# 判断下agent是否上线if assistant.status != AssistantStatus.ONLINE.value:raise IgnoreException('当前助手未上线,无法直接对话')elif not self.chat_id:# 调试界面没测都重新生成assistant = AssistantDao.get_one_assistant(UUID(self.client_id))if not assistant:raise IgnoreException('该助手已被删除')except IgnoreException as e:logger.exception("get assistant info error")await self.websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=str(e))raise IgnoreException(f'get assistant info error: {str(e)}')try:if self.chat_id and self.gpts_agent is None:# 会话业务agent通过数据库数据固定生成,不用每次变化self.gpts_agent = AssistantAgent(assistant, self.chat_id)await self.gpts_agent.init_assistant(self.gpts_async_callback)elif not self.chat_id:# 调试界面每次都重新生成self.gpts_agent = AssistantAgent(assistant, self.chat_id)await self.gpts_agent.init_assistant(self.gpts_async_callback)except Exception as e:logger.exception("agent init error")raise Exception(f'agent init error: {str(e)}')
功能

初始化GPTS(基于GPT的服务)助手代理对象,处理与助手的业务逻辑交互。

说明
  1. 初始化聊天历史和回调:
    • 调用self.init_chat_history()方法初始化聊天历史记录。
    • 调用self.init_gpts_callback()方法初始化GPTS的异步回调处理器。
  2. 获取助手信息:
    • 会话业务助手:
      • 如果存在self.chat_idself.gpts_agentNone,表示需要初始化会话业务助手。
      • 使用AssistantDao.get_one_assistant(UUID(self.client_id))从数据库中获取助手信息。
      • 如果助手不存在,抛出IgnoreException异常,说明助手已被删除。
      • 检查助手的状态是否为ONLINE,如果不是,抛出IgnoreException异常,说明助手未上线。
    • 调试模式助手:
      • 如果self.chat_id不存在,表示处于调试模式,需要重新生成助手。
      • 使用AssistantDao.get_one_assistant(UUID(self.client_id))从数据库中获取助手信息。
      • 如果助手不存在,抛出IgnoreException异常,说明助手已被删除。
  3. 异常处理:
    • 捕获IgnoreException异常,记录异常信息,关闭WebSocket连接并抛出异常,通知客户端助手不可用。
  4. 初始化助手代理对象:
    • 会话业务助手:
      • 如果存在self.chat_idself.gpts_agentNone,创建一个新的AssistantAgent对象,传入助手信息和chat_id
      • 调用self.gpts_agent.init_assistant(self.gpts_async_callback)初始化助手代理对象,传入异步回调处理器。
    • 调试模式助手:
      • 如果self.chat_id不存在,创建一个新的AssistantAgent对象,传入助手信息和chat_id(通常为None)。
      • 调用self.gpts_agent.init_assistant(self.gpts_async_callback)初始化助手代理对象,传入异步回调处理器。
  5. 异常处理:
    • 捕获初始化助手代理对象时的所有异常,记录异常信息,并抛出新的异常,通知调用者助手初始化失败。
init_chat_history 方法
async def init_chat_history(self):# 初始化历史记录,不为空则不用重新初始化if len(self.chat_history) > 0:return# 从数据库加载历史会话if self.chat_id:res = ChatMessageDao.get_messages_by_chat_id(self.chat_id, ['question', 'answer'],self.latest_history_num * 4)for one in res:self.chat_history.append({'message': one.message,'category': one.category,'remark': one.remark})
功能

初始化聊天历史记录,从数据库加载历史会话消息。

说明
  1. 检查本地聊天历史:
    • 如果self.chat_history已经有内容,表示历史记录已初始化,无需重新加载,直接返回。
  2. 加载历史会话:
    • 如果存在self.chat_id,表示需要加载与该聊天会话相关的历史消息。
    • 调用ChatMessageDao.get_messages_by_chat_id方法从数据库中获取指定chat_id的消息,过滤类别为'question''answer',并限制返回的消息数量为self.latest_history_num * 4(即20条消息)。
    • 遍历查询结果,将每条消息的内容、类别和备注信息添加到self.chat_history列表中。
get_latest_history 方法
async def get_latest_history(self):# 需要将无效的历史消息剔除,只包含一问一答的完整会话记录tmp = []find_i = 0is_answer = True# 从聊天历史里获取for i in range(len(self.chat_history) - 1, -1, -1):one_item = self.chat_history[i]if find_i >= self.latest_history_num:break# 不包含中断的答案if one_item['category'] == 'answer' and one_item.get('remark') != 'break_answer' and is_answer:tmp.insert(0, AIMessage(content=one_item['message']))is_answer = Falseelif one_item['category'] == 'question' and not is_answer:tmp.insert(0, HumanMessage(content=json.loads(one_item['message'])['input']))is_answer = Truefind_i += 1return tmp
功能

获取最新的聊天历史记录,确保只包含完整的问答对话,并剔除无效的历史消息。

说明
  1. 初始化临时列表和状态变量:
    • tmp: 用于存储最新的聊天历史记录。
    • find_i: 计数器,用于记录找到的问答对数。
    • is_answer: 布尔标志,表示当前期望的是回答还是问题。
  2. 遍历聊天历史:
    • self.chat_history的最后一条消息开始,逆序遍历所有消息。
    • 对于每条消息,检查其类别和备注信息,确保只包含有效的问答对话:
      • 如果消息类别为'answer',且备注信息不为'break_answer',且当前期望的是回答(is_answer=True),则将其作为AIMessage添加到tmp列表的开头,并将is_answer设置为False,表示下一步期望的是问题。
      • 如果消息类别为'question',且当前期望的是问题(is_answer=False),则将其作为HumanMessage添加到tmp列表的开头,并将is_answer设置为True,同时增加find_i计数器。
    • 当找到的问答对数达到self.latest_history_num(即5对),停止遍历。
  3. 返回结果:返回包含最新有效问答对的tmp列表。
init_gpts_callback 方法
async def init_gpts_callback(self):if self.gpts_async_callback is not None:returnasync_callbacks = [AsyncGptsDebugCallbackHandler(**{'websocket': self.websocket,'flow_id': self.client_id,'chat_id': self.chat_id,'user_id': self.user_id,'stream_queue': self.stream_queue,})]self.gpts_async_callback = async_callbacks
功能

初始化GPTS的异步回调处理器,用于处理GPT生成的调试信息和流式输出。

说明
  1. 检查是否已初始化:
    • 如果self.gpts_async_callback已经存在,表示回调处理器已初始化,无需重新初始化,直接返回。
  2. 创建回调处理器:
    • 创建一个AsyncGptsDebugCallbackHandler对象,并传入必要的参数,包括WebSocket连接、客户端ID、聊天ID、用户ID和流式输出队列。
    • 将创建的回调处理器列表赋值给self.gpts_async_callback
stop_handle_message 方法
async def stop_handle_message(self, message: Dict[any, any]):# 中止流式输出, 因为最新的任务id是中止任务的id,不能取消自己logger.info(f'need stop agent, client_key: {self.client_key}, message: {message}')# 中止之前的处理函数thread_pool.cancel_task(self.task_ids[:-1])# 将流式输出的内容写到数据库内answer = ''while not self.stream_queue.empty():msg = self.stream_queue.get()answer += msg# 有流式输出内容的话,记录流式输出内容到数据库if answer.strip():res = await self.add_message('bot', answer, 'answer', 'break_answer')await self.send_response('answer', 'end', '', message_id=res.id if res else None)await self.send_response('processing', 'close', '')
功能

处理停止操作,终止当前的助手代理任务,并记录流式输出内容到数据库。

参数
  • message (Dict[any, any]): 停止操作的相关消息数据。
说明
  1. 记录停止请求:记录需要停止代理的信息,包括client_keymessage内容。
  2. 取消任务:
    • 调用thread_pool.cancel_task方法取消除最后一个任务以外的所有任务,确保当前任务不会被取消(防止自身被取消)。
  3. 收集流式输出内容:
    • self.stream_queue中取出所有流式输出内容,并拼接到answer字符串中。
  4. 记录并发送流式输出内容:
    • 如果answer非空,调用self.add_message方法将流式输出内容作为机器人回答添加到聊天历史记录中,类别为'answer',备注为'break_answer'
    • 调用self.send_response方法发送一个结束响应,包含消息ID。
  5. 发送关闭响应:调用self.send_response方法发送一个关闭响应,类别为'processing',类型为'close'
clear_stream_queue 方法
async def clear_stream_queue(self):while not self.stream_queue.empty():self.stream_queue.get()
功能

清空流式输出队列,防止上一次的回答污染本次回答。

说明

通过循环调用self.stream_queue.get()方法,直到队列为空,确保所有流式输出内容被清空。

handle_gpts_message 方法
async def handle_gpts_message(self, message: Dict[any, any]):if not message:returnlogger.debug(f'receive client message, client_key: {self.client_key} message: {message}')if message.get('action') == 'stop':await self.stop_handle_message(message)returntry:await self.send_response('processing', 'begin', '')# 清空流式队列,防止把上一次的回答,污染本次回答await self.clear_stream_queue()inputs = message.get('inputs', {})input_msg = inputs.get('input')if not input_msg:# 需要切换会话logger.debug(f'need switch agent, client_key: {self.client_key} inputs: {inputs}')self.client_id = inputs.get('data').get('id')self.chat_id = inputs.get('data').get('chatId')self.gpts_agent = Noneself.gpts_async_callback = Noneself.chat_history = []await self.init_gpts_agent()return# 初始化agentawait self.init_gpts_agent()# 将用户问题写入到数据库await self.add_message('human', json.dumps(inputs, ensure_ascii=False), 'question')# 获取回话历史chat_history = await self.get_latest_history()# 调用agent获取结果result = await self.gpts_agent.run(input_msg, chat_history, self.gpts_async_callback)logger.debug(f'gpts agent {self.client_key} result: {result}')answer = ''for one in result:if isinstance(one, AIMessage):answer += one.content# todo: 后续优化代码解释器的实现方案,保证输出的文件可以公开访问 ugly solve# 获取minio的share地址,把share域名去掉, 为毕昇的部署方案特殊处理下for one in self.gpts_agent.tools:if one.name == "bisheng_code_interpreter":minio_share = settings.get_knowledge().get('minio', {}).get('MINIO_SHAREPOIN', '')answer = answer.replace(f"http://{minio_share}", "")answer_end_type = 'end'# 如果是流式的llm则用end_cover结束, 覆盖之前流式的输出if getattr(self.gpts_agent.llm, 'streaming', False):answer_end_type = 'end_cover'res = await self.add_message('bot', answer, 'answer')await self.send_response('answer', 'start', '')await self.send_response('answer', answer_end_type, answer, message_id=res.id if res else None)logger.info(f'gptsAgentOver assistant_id:{self.client_id} chat_id:{self.chat_id} question:{input_msg}')logger.info(f'gptsAgentOver assistant_id:{self.client_id} chat_id:{self.chat_id} answer:{answer}')except Exception as e:logger.exception('handle gpts message error: ')await self.send_response('system', 'start', '')await self.send_response('system', 'end', 'Error: ' + str(e))finally:await self.send_response('processing', 'close', '')
功能

处理GPTS助手生成的消息,与助手代理进行交互,生成并发送响应。

参数
  • message (Dict[any, any]): 客户端发送的消息数据,通常包含actioninputs等字段。
说明
  1. 检查消息内容:
    • 如果message为空或None,直接返回,不做处理。
  2. 记录接收到的消息:记录接收到的消息内容,包括client_key和消息内容。
  3. 处理停止操作:
    • 如果message中的action'stop',调用self.stop_handle_message(message)方法处理停止操作,并返回。
  4. 正常消息处理:
    • 发送开始响应:调用self.send_response方法发送一个类型为'begin',类别为'processing'的响应,表示开始处理。
    • 清空流式输出队列:调用self.clear_stream_queue()方法清空流式输出队列,防止上一次的回答污染本次回答。
    • 获取输入数据:
      • message中获取inputs,并从中提取input字段作为用户输入消息input_msg
    • 处理会话切换:
      • 如果input_msg为空,表示需要切换会话。
      • 记录需要切换会话的调试信息。
      • 更新self.client_idself.chat_id为新的会话ID。
      • 重置助手代理对象、回调处理器和聊天历史记录。
      • 调用self.init_gpts_agent()方法重新初始化助手代理对象,并返回。
    • 初始化助手代理对象:调用self.init_gpts_agent()方法,确保助手代理对象已初始化。
    • 记录用户问题:调用self.add_message方法将用户问题记录到聊天历史记录中,类别为'question',类型为'human'
    • 获取会话历史:调用self.get_latest_history()方法获取最新的聊天历史记录,用于上下文生成。
    • 调用助手代理获取结果:
      • 调用self.gpts_agent.run(input_msg, chat_history, self.gpts_async_callback)方法,传入用户输入、聊天历史和回调处理器,获取助手的响应结果。
      • 记录助手生成的结果日志。
    • 处理助手生成的结果:
      • 遍历助手生成的结果result,将所有AIMessage类型的内容拼接成一个完整的回答answer
    • 处理特定工具的输出:
      • 遍历助手代理的工具,如果工具名称为"bisheng_code_interpreter",则处理MinIO的共享链接,确保输出文件的可访问性。
    • 确定结束类型:
      • 如果助手的LLM支持流式输出(streaming=True),则将结束类型设置为'end_cover',覆盖之前的流式输出。
      • 否则,设置为'end'
    • 记录并发送回答:
      • 调用self.add_message方法将机器人回答添加到聊天历史记录中,类别为'answer',类型为'bot'
      • 调用self.send_response方法发送一个类型为'start',类别为'answer'的响应,表示回答开始。
      • 调用self.send_response方法发送一个结束响应,包含回答内容和消息ID。
      • 记录助手完成对话的日志信息。
  5. 异常处理:
    • 捕获所有异常,记录异常信息。
    • 调用self.send_response方法发送一个类型为'system'的开始响应。
    • 调用self.send_response方法发送一个类型为'system'的结束响应,包含错误信息。
  6. 最终处理:
    • 无论是否发生异常,调用self.send_response方法发送一个类型为'processing',类别为'close'的响应,表示处理结束。
init_gpts_callback 方法
async def init_gpts_callback(self):if self.gpts_async_callback is not None:returnasync_callbacks = [AsyncGptsDebugCallbackHandler(**{'websocket': self.websocket,'flow_id': self.client_id,'chat_id': self.chat_id,'user_id': self.user_id,'stream_queue': self.stream_queue,})]self.gpts_async_callback = async_callbacks
功能

初始化GPTS的异步回调处理器,用于处理GPT生成的调试信息和流式输出。

说明
  1. 检查是否已初始化:
    • 如果self.gpts_async_callback已经存在,表示回调处理器已初始化,无需重新初始化,直接返回。
  2. 创建回调处理器:
    • 创建一个AsyncGptsDebugCallbackHandler对象,并传入必要的参数,包括WebSocket连接、客户端ID、聊天ID、用户ID和流式输出队列。
    • 将创建的回调处理器列表赋值给self.gpts_async_callback
stop_handle_message 方法
async def stop_handle_message(self, message: Dict[any, any]):# 中止流式输出, 因为最新的任务id是中止任务的id,不能取消自己logger.info(f'need stop agent, client_key: {self.client_key}, message: {message}')# 中止之前的处理函数thread_pool.cancel_task(self.task_ids[:-1])# 将流式输出的内容写到数据库内answer = ''while not self.stream_queue.empty():msg = self.stream_queue.get()answer += msg# 有流式输出内容的话,记录流式输出内容到数据库if answer.strip():res = await self.add_message('bot', answer, 'answer', 'break_answer')await self.send_response('answer', 'end', '', message_id=res.id if res else None)await self.send_response('processing', 'close', '')
功能

处理停止操作,终止当前的助手代理任务,并记录流式输出内容到数据库。

参数
  • message (Dict[any, any]): 停止操作的相关消息数据。
说明
  1. 记录停止请求:记录需要停止代理的信息,包括client_keymessage内容。
  2. 取消任务:
    • 调用thread_pool.cancel_task方法取消除最后一个任务以外的所有任务,确保当前任务不会被取消(防止自身被取消)。
  3. 收集流式输出内容:
    • self.stream_queue中取出所有流式输出内容,并拼接到answer字符串中。
  4. 记录并发送流式输出内容:
    • 如果answer非空,调用self.add_message方法将流式输出内容作为机器人回答添加到聊天历史记录中,类别为'answer',备注为'break_answer'
    • 调用self.send_response方法发送一个结束响应,包含消息ID。
  5. 发送关闭响应:调用self.send_response方法发送一个关闭响应,类别为'processing',类型为'close'
clear_stream_queue 方法
async def clear_stream_queue(self):while not self.stream_queue.empty():self.stream_queue.get()
功能

清空流式输出队列,防止上一次的回答污染本次回答。

说明

通过循环调用self.stream_queue.get()方法,直到队列为空,确保所有流式输出内容被清空。

handle_gpts_message 方法
async def handle_gpts_message(self, message: Dict[any, any]):if not message:returnlogger.debug(f'receive client message, client_key: {self.client_key} message: {message}')if message.get('action') == 'stop':await self.stop_handle_message(message)returntry:await self.send_response('processing', 'begin', '')# 清空流式队列,防止把上一次的回答,污染本次回答await self.clear_stream_queue()inputs = message.get('inputs', {})input_msg = inputs.get('input')if not input_msg:# 需要切换会话logger.debug(f'need switch agent, client_key: {self.client_key} inputs: {inputs}')self.client_id = inputs.get('data').get('id')self.chat_id = inputs.get('data').get('chatId')self.gpts_agent = Noneself.gpts_async_callback = Noneself.chat_history = []await self.init_gpts_agent()return# 初始化agentawait self.init_gpts_agent()# 将用户问题写入到数据库await self.add_message('human', json.dumps(inputs, ensure_ascii=False), 'question')# 获取回话历史chat_history = await self.get_latest_history()# 调用agent获取结果result = await self.gpts_agent.run(input_msg, chat_history, self.gpts_async_callback)logger.debug(f'gpts agent {self.client_key} result: {result}')answer = ''for one in result:if isinstance(one, AIMessage):answer += one.content# todo: 后续优化代码解释器的实现方案,保证输出的文件可以公开访问 ugly solve# 获取minio的share地址,把share域名去掉, 为毕昇的部署方案特殊处理下for one in self.gpts_agent.tools:if one.name == "bisheng_code_interpreter":minio_share = settings.get_knowledge().get('minio', {}).get('MINIO_SHAREPOIN', '')answer = answer.replace(f"http://{minio_share}", "")answer_end_type = 'end'# 如果是流式的llm则用end_cover结束, 覆盖之前流式的输出if getattr(self.gpts_agent.llm, 'streaming', False):answer_end_type = 'end_cover'res = await self.add_message('bot', answer, 'answer')await self.send_response('answer', 'start', '')await self.send_response('answer', answer_end_type, answer, message_id=res.id if res else None)logger.info(f'gptsAgentOver assistant_id:{self.client_id} chat_id:{self.chat_id} question:{input_msg}')logger.info(f'gptsAgentOver assistant_id:{self.client_id} chat_id:{self.chat_id} answer:{answer}')except Exception as e:logger.exception('handle gpts message error: ')await self.send_response('system', 'start', '')await self.send_response('system', 'end', 'Error: ' + str(e))finally:await self.send_response('processing', 'close', '')
功能

处理从客户端接收到的GPTS助手生成的消息,与助手代理进行交互,生成并发送响应。

参数
  • message (Dict[any, any]): 客户端发送的消息数据,通常包含actioninputs等字段。
说明
  1. 检查消息内容:
    • 如果message为空或None,直接返回,不做处理。
  2. 记录接收到的消息:记录接收到的消息内容,包括client_key和消息内容。
  3. 处理停止操作:
    • 如果message中的action'stop',调用self.stop_handle_message(message)方法处理停止操作,并返回。
  4. 正常消息处理:
    • 发送开始响应:调用self.send_response方法发送一个类型为'begin',类别为'processing'的响应,表示开始处理。
    • 清空流式输出队列:调用self.clear_stream_queue()方法清空流式输出队列,防止上一次的回答污染本次回答。
    • 获取输入数据:
      • message中获取inputs,并从中提取input字段作为用户输入消息input_msg
    • 处理会话切换:
      • 如果input_msg为空,表示需要切换会话。
      • 记录需要切换会话的调试信息。
      • 更新self.client_idself.chat_id为新的会话ID。
      • 重置助手代理对象、回调处理器和聊天历史记录。
      • 调用self.init_gpts_agent()方法重新初始化助手代理对象,并返回。
    • 初始化助手代理对象:调用self.init_gpts_agent()方法,确保助手代理对象已初始化。
    • 记录用户问题:调用self.add_message方法将用户问题记录到聊天历史记录中,类别为'question',类型为'human'
    • 获取会话历史:调用self.get_latest_history()方法获取最新的聊天历史记录,用于上下文生成。
    • 调用助手代理获取结果:
      • 调用self.gpts_agent.run(input_msg, chat_history, self.gpts_async_callback)方法,传入用户输入、聊天历史和回调处理器,获取助手的响应结果。
      • 记录助手生成的结果日志。
    • 处理助手生成的结果:
      • 遍历助手生成的结果result,将所有AIMessage类型的内容拼接成一个完整的回答answer
    • 处理特定工具的输出:
      • 遍历助手代理的工具,如果工具名称为"bisheng_code_interpreter",则处理MinIO的共享链接,确保输出文件的可访问性。
    • 确定结束类型:
      • 如果助手的LLM支持流式输出(streaming=True),则将结束类型设置为'end_cover',覆盖之前的流式输出。
      • 否则,设置为'end'
    • 记录并发送回答:
      • 调用self.add_message方法将机器人回答添加到聊天历史记录中,类别为'answer',类型为'bot'
      • 调用self.send_response方法发送一个类型为'start',类别为'answer'的响应,表示回答开始。
      • 调用self.send_response方法发送一个结束响应,包含回答内容和消息ID。
      • 记录助手完成对话的日志信息。
  5. 异常处理:
    • 捕获所有异常,记录异常信息。
    • 调用self.send_response方法发送一个类型为'system'的开始响应。
    • 调用self.send_response方法发送一个类型为'system'的结束响应,包含错误信息。
  6. 最终处理:
    • 无论是否发生异常,调用self.send_response方法发送一个类型为'processing',类别为'close'的响应,表示处理结束。
intermediate_logs 方法
async def intermediate_logs(self, session: ChatManager, client_id, chat_id, user_id,intermediate_steps):end_resp = ChatResponse(type='end', user_id=user_id)if not intermediate_steps:return await session.send_json(client_id, chat_id, end_resp, add=False)# 将最终的分析过程存数据库steps = []if isinstance(intermediate_steps, list):# autogen produce multi dialogfor message in intermediate_steps:# autogen produce message objectif isinstance(message, str):log = messageis_bot = Truecategory = 'processing'content = sender = receiver = Noneelse:content = message.get('message')log = message.get('log', '')sender = message.get('sender')receiver = message.get('receiver')is_bot = False if receiver and receiver.get('is_bot') else Truecategory = message.get('category', 'processing')msg = ChatResponse(message=content,intermediate_steps=log,sender=sender,receiver=receiver,type='end',user_id=user_id,is_bot=is_bot,category=category)steps.append(msg)else:# agent model will produce the steps logfrom langchain.schema import Document  # noqaif chat_id and intermediate_steps.strip():finally_log = ''for s in intermediate_steps.split('\n'):# 清理召回日志中的一些冗余日志if 'source_documents' in s:answer = eval(s.split(':', 1)[1])if 'result' in answer:finally_log += 'Answer: ' + answer.get('result') + '\n\n'else:finally_log += s + '\n\n'msg = ChatResponse(intermediate_steps=finally_log, type='end', user_id=user_id)steps.append(msg)else:# 只有L3用户给出详细的logend_resp.intermediate_steps = intermediate_stepsawait session.send_json(client_id, chat_id, end_resp, add=False)for step in steps:# save chate messagesession.chat_history.add_message(client_id, chat_id, step)
功能

记录和存储中间步骤的日志,确保详细的过程记录,并将其发送给客户端。

参数
  • session (ChatManager): 聊天管理器实例。
  • client_id (str): 客户端的唯一标识符。
  • chat_id (str): 聊天会话的唯一标识符。
  • user_id: 用户的唯一标识符。
  • intermediate_steps: 中间步骤的日志或消息,可能是字符串或列表。
说明
  1. 创建结束响应:创建一个ChatResponse对象,类型为'end',用于结束当前会话。
  2. 检查中间步骤:
    • 如果intermediate_steps为空或None,直接发送结束响应并返回。
  3. 处理中间步骤日志:
    • 如果intermediate_steps是列表:
      • 遍历每个消息。
      • 如果消息是字符串,设置日志信息,类别为'processing',并创建一个ChatResponse对象,添加到steps列表。
      • 如果消息是字典,提取messagelogsenderreceiver信息,判断是否为机器人消息,设置类别,创建ChatResponse对象,并添加到steps列表。
    • 如果intermediate_steps是字符串:
      • 导入Document类(此行注释# noqa表示忽略代码检查器的警告)。
      • 如果chat_id存在且intermediate_steps不为空:
        • 遍历每一行日志,清理召回日志中的冗余信息:
          • 如果行中包含'source_documents',提取result并添加到最终日志中。
          • 否则,将行内容直接添加到最终日志中。
        • 创建一个ChatResponse对象,包含清理后的日志信息,添加到steps列表。
      • 否则(例如,L3用户给出详细的日志),将intermediate_steps直接设置到end_respintermediate_steps属性。
  4. 发送结束响应:发送end_resp消息,不将其添加到聊天历史记录中。
  5. 保存聊天消息:遍历steps列表,将每个ChatResponse对象添加到聊天历史记录中。

总结

通过上述逐行注释和对函数参数的详细解释,您应该能够更深入地理解client.pyChatClient类及其在整个项目中的作用。以下是该类的主要职责:

  • 管理单个聊天会话:负责处理与单个客户端的WebSocket连接,接收和发送消息。
  • 处理不同类型的消息:通过handle_message方法接收客户端消息,并根据工作类型分派到相应的处理器(如GPTS助手)。
  • 记录聊天历史:通过add_message方法将消息记录到聊天历史中,并在需要时保存到数据库。
  • 与助手代理交互:通过AssistantAgent类与助手代理进行交互,获取和处理助手生成的响应。
  • 处理流式输出:使用stream_queue管理和记录流式输出内容,确保实时响应和内容一致性。
  • 异常处理:在消息处理过程中捕获和处理异常,确保系统的稳定性和可靠性。
  • 审计日志记录:在新建会话时记录审计日志,确保操作的可追溯性和安全性。

utils.py

当然,我将为您详细讲解您提供的client.py代码。这将包括对导入部分的解释、每个函数的详细说明,以及每个函数的参数和功能的深入分析。

导入部分

import json
from typing import Dict, List
from urllib.parse import unquote, urlparsefrom bisheng.api.services.llm import LLMService
from bisheng.api.v1.schemas import ChatMessage
from bisheng.database.base import session_getter
from bisheng.database.models.recall_chunk import RecallChunk
from bisheng.interface.utils import try_setting_streaming_options
from bisheng.processing.base import get_result_and_steps
from bisheng.utils.logger import logger
from fastapi import WebSocket
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.schema.document import Document

标准库导入

  • json: 用于处理JSON数据的序列化和反序列化。
  • typing.Dict, List: 用于类型提示,Dict表示字典类型,List表示列表类型。
  • urllib.parse.unquote, urlparse: 用于解析和解码URL。

自定义模块导入

  • bisheng.api.services.llm.LLMService: 自定义的LLM服务,用于与大型语言模型交互。
  • bisheng.api.v1.schemas.ChatMessage: 定义聊天消息的数据模型。
  • bisheng.database.base.session_getter: 获取数据库会话的上下文管理器。
  • bisheng.database.models.recall_chunk.RecallChunk: 数据库模型,用于存储召回的文档片段。
  • bisheng.interface.utils.try_setting_streaming_options: 实用函数,用于设置流式选项。
  • bisheng.processing.base.get_result_and_steps: 处理图数据并获取结果和中间步骤的函数。
  • bisheng.utils.logger.logger: 日志记录工具,用于记录信息、警告和错误。
  • fastapi.WebSocket: FastAPI中的WebSocket类,用于处理WebSocket连接。
  • langchain.chains.LLMChain: LangChain库中的LLM链,用于处理与大型语言模型的交互。
  • langchain.prompts.PromptTemplate: 用于定义提示模板,指导LLM生成特定类型的响应。
  • langchain.schema.document.Document: LangChain中的文档模型,用于表示文档内容及其元数据。

函数详解

1. process_graph 函数

async def process_graph(langchain_object,chat_inputs: ChatMessage,websocket: WebSocket,flow_id: str = None,chat_id: str = None,**kwargs):langchain_object = try_setting_streaming_options(langchain_object, websocket)logger.debug('Loaded langchain object')if langchain_object is None:# Raise user facing errorraise ValueError('There was an error loading the langchain_object. Please, check all the nodes and try again.')# Generate result and thoughttry:if not chat_inputs.message:logger.debug('No message provided')raise ValueError('No message provided')logger.debug('Generating result and thought')result, intermediate_steps, source_document = await get_result_and_steps(langchain_object,chat_inputs.message,websocket=websocket,flow_id=flow_id,chat_id=chat_id,**kwargs)logger.debug('Generated result and intermediate_steps')return result, intermediate_steps, source_documentexcept Exception as e:# Log stack tracelogger.exception(e)raise e
参数
  • langchain_object: LangChain对象,负责处理复杂的对话逻辑和生成响应。
  • chat_inputs (ChatMessage): 包含用户输入消息的ChatMessage对象。
  • websocket (WebSocket): WebSocket连接对象,用于实时通信。
  • flow_id (str): 流程的唯一标识符,可选。
  • chat_id (str): 聊天会话的唯一标识符,可选。
  • **kwargs: 其他可选参数。
功能
  1. 设置流式选项
    • 调用try_setting_streaming_options函数,尝试为langchain_object设置流式选项,确保可以进行实时响应。
  2. 检查langchain_object是否加载成功
    • 如果langchain_objectNone,则抛出一个ValueError,提示用户检查节点配置。
  3. 生成结果和中间步骤
    • 检查chat_inputs.message是否存在。如果不存在,记录日志并抛出ValueError
    • 调用get_result_and_steps函数,传入langchain_object、用户消息、WebSocket连接、flow_idchat_id,以及其他可选参数,获取处理结果、生成的中间步骤和源文档。
    • 记录生成结果和中间步骤的日志。
    • 返回resultintermediate_stepssource_document
  4. 异常处理
    • 捕获所有异常,记录堆栈跟踪,并重新抛出异常,以便上层调用者处理。

2. extract_answer_keys 函数

def extract_answer_keys(answer, llm):"""提取answer中的关键词"""llm_chain = Noneif llm:llm_chain = LLMChain(llm=llm, prompt=PromptTemplate.from_template(prompt_template))try:keywords_str = llm_chain.run(answer)keywords = eval(keywords_str[9:])except Exception:import jieba.analyselogger.warning(f'llm {llm} extract_not_support, change to jieba')keywords = jieba.analyse.extract_tags(answer, topK=100, withWeight=False)return keywords
参数
  • answer (str): 需要提取关键词的回答内容。
  • llm: 大型语言模型对象,用于提取关键词。如果llmNone,则使用备用的关键词提取方法。
功能
  1. 初始化LLM链:
    • 如果提供了llm对象,使用LLMChain和预定义的prompt_template初始化LLM链。
  2. 提取关键词:
    • 尝试通过LLM链运行answer,获取生成的关键词字符串。
    • 使用eval函数解析关键词字符串(假设关键词以'KeyWords: '开头,后面跟着列表)。
  3. 备用关键词提取方法:
    • 如果LLM提取失败(例如,LLM不支持或发生错误),则使用jieba.analyse库的extract_tags方法进行关键词提取。
    • 记录警告日志,说明LLM提取失败,改用jieba库。
  4. 返回关键词列表:
    • 返回提取到的关键词列表。
说明
  • 使用LLM进行关键词提取可以提高准确性和上下文相关性。
  • 使用jieba库作为备用方案,确保在LLM不可用时仍能提取关键词。

3. judge_source 函数

async def judge_source(result, source_document, chat_id, extra: Dict):source = 0if isinstance(result, Document):# 返回的是Documentmetadata = result.metadataquestion = result.page_contentresult = json.loads(metadata.get('extra', '{}')).get('answer')source = 4extra.update({'qa': f'本答案来源于已有问答库: {question}','url': json.loads(metadata.get('extra', '{}')).get('url')})elif source_document and chat_id:if any(not doc.metadata.get('right', True) for doc in source_document):source = 2elif all(doc.metadata.get('extra') and json.loads(doc.metadata.get('extra')).get('url')for doc in source_document):source = 3repeat_doc = {}doc = []# 来源文档做去重,不能改变原有的顺序for one in source_document:title = one.metadata.get('source')url = json.loads(one.metadata.get('extra', '{}')).get('url')repeat_key = (title, url)# 重复的丢掉,不返回if repeat_doc.get(repeat_key):continuedoc.append({'title': title, 'url': url})repeat_doc[repeat_key] = 1extra.update({'doc': doc})else:source = 1if source == 1:for doc in source_document:# 确保每个chunk 都可溯源if 'bbox' not in doc.metadata or not doc.metadata['bbox'] or not json.loads(doc.metadata['bbox'])['chunk_bboxes']:source = 0breakreturn source, result
参数
  • result: 助手生成的结果,可能是字符串或Document对象。
  • source_document (List[Document]): 来源文档的列表,用于溯源和验证答案。
  • chat_id (str): 聊天会话的唯一标识符。
  • extra (Dict): 额外的上下文信息,用于存储溯源相关的数据。
功能
  1. 初始化源类型
    • source = 0: 默认源类型,表示未识别或无效。
  2. 处理Document类型的结果
    • 如果result是Document对象:
      • metadata中提取extra字段,获取实际的答案内容。
      • 更新result为提取的答案内容。
      • 设置source = 4,表示答案来源于已有的问答库。
      • 更新extra字典,记录问答库来源和URL链接。
  3. 处理来源文档
    • 如果存在source_document且chat_id有效:
      • 检查文档有效性:
        • 如果任何一个文档的metadata'right'字段为False,则设置source = 2,表示来源文档有问题。
      • 检查文档URL:
        • 如果所有文档的metadata中存在'extra'字段且包含'url',则设置source = 3,表示所有来源文档都有有效的URL。
        • 去重处理:
          • 遍历source_document,根据文档的sourceurl进行去重,确保每个文档唯一。
          • 更新extra字典,记录去重后的文档信息。
      • 其他情况:
        • 设置source = 1,表示来源文档存在但不满足以上条件。
  4. 进一步验证文档溯源
    • 如果source == 1,则进一步检查每个文档的metadata是否包含有效的'bbox'信息,确保每个文档片段可溯源。
    • 如果发现任何文档片段缺失或无效的'bbox'信息,则重置source = 0
  5. 返回结果和源类型
    • 返回最终的source类型和处理后的result
源类型含义
  • source = 0: 无效或未识别的来源。
  • source = 1: 来源文档存在,但不满足所有要求。
  • source = 2: 来源文档有问题(如'right'字段为False)。
  • source = 3: 来源文档都有有效的URL,且已去重。
  • source = 4: 答案来源于已有的问答库。

4. process_source_document 函数

async def process_source_document(source_document: List[Document], chat_id, message_id, answer):if not source_document:return# 使用大模型进行关键词抽取,模型配置临时方案llm = LLMService.get_knowledge_source_llm()answer_keywords = extract_answer_keys(answer, llm)batch_insert = []for doc in source_document:if 'bbox' in doc.metadata:# 表示支持溯源content = doc.page_contentrecall_chunk = RecallChunk(chat_id=chat_id,keywords=json.dumps(answer_keywords),chunk=content,file_id=doc.metadata.get('file_id'),meta_data=json.dumps(doc.metadata),message_id=message_id)batch_insert.append(recall_chunk)if batch_insert:with session_getter() as db_session:db_session.add_all(batch_insert)db_session.commit()
参数
  • source_document (List[Document]): 来源文档的列表,用于记录和存储。
  • chat_id (str): 聊天会话的唯一标识符。
  • message_id (int): 消息的唯一标识符。
  • answer (str): 助手生成的回答内容,用于关键词提取。
功能
  1. 检查来源文档
    • 如果source_document为空或None,则直接返回,不做任何处理。
  2. 关键词提取
    • 调用LLMService.get_knowledge_source_llm()获取用于关键词提取的LLM(大型语言模型)。
    • 使用extract_answer_keys函数,从answer中提取关键词列表。
  3. 准备批量插入数据
    • 初始化一个空列表batch_insert,用于存储要插入数据库的RecallChunk对象。
    • 遍历source_document中的每个文档:
      • 如果文档的metadata中包含'bbox'字段,表示支持溯源。
      • 提取文档内容content
      • 创建一个RecallChunk对象,包含以下字段:
        • chat_id: 聊天会话的唯一标识符。
        • keywords: 提取到的关键词列表,序列化为JSON字符串。
        • chunk: 文档内容。
        • file_id: 文档的文件ID,从metadata中提取。
        • meta_data: 文档的元数据,序列化为JSON字符串。
        • message_id: 消息的唯一标识符。
      • 将创建的RecallChunk对象添加到batch_insert列表中。
  4. 批量插入数据库
    • 如果batch_insert列表不为空:
      • 使用session_getter上下文管理器获取数据库会话db_session
      • 调用db_session.add_all(batch_insert)将所有RecallChunk对象添加到会话中。
      • 调用db_session.commit()提交事务,将数据保存到数据库中。
说明
  • 关键词提取:使用LLM或jieba库从回答中提取关键词,有助于后续的文档召回和溯源。
  • 批量插入:通过批量插入提高数据库操作的效率,减少事务次数。

5. process_node_data 函数

def process_node_data(node_data: List[Dict]) -> Dict:tweak = {}for nd in node_data:if nd.get('id') not in tweak:tweak[nd.get('id')] = {}if 'InputFile' in nd.get('id', ''):file_path = nd.get('file_path')url_path = urlparse(file_path)if url_path.netloc:file_name = unquote(url_path.path.split('/')[-1])else:file_name = file_path.split('_', 1)[1] if '_' in file_path else ''nd['value'] = file_nametweak[nd.get('id')] = {'file_path': file_path, 'value': file_name}elif 'VariableNode' in nd.get('id', ''):# general key valuevariables = nd.get('name')variable_value = nd.get('value')# actual key varaialbes & variable_valuevariables_list = tweak[nd.get('id')].get('variables', [])if not variables_list:tweak[nd.get('id')]['variables'] = variables_listtweak[nd.get('id')]['variable_value'] = []variables_list.append(variables)# valuevariables_value_list = tweak[nd.get('id')].get('variable_value', [])variables_value_list.append(variable_value)return tweak
参数
  • node_data (List[Dict]): 节点数据的列表,每个节点数据为字典,包含idfile_pathnamevalue等字段。
功能
  1. 初始化调整字典
    • 创建一个空字典teak,用于存储处理后的节点数据。
  2. 遍历节点数据
    • 对于node_data列表中的每个节点字典nd:
      • 初始化节点调整信息:
        • 如果nd['id']不在teak中,则初始化为一个空字典。
      • 处理文件输入节点:
        • 如果节点的id包含’InputFile’:
          • 提取file_path字段。
          • 使用urlparse解析file_path,获取URL路径。
          • 根据URL路径是否包含网络位置(netloc)决定文件名的提取方式:
            • 如果包含网络位置,则解码URL路径的最后一部分作为文件名。
            • 否则,尝试从file_path中提取文件名,假设格式为'prefix_filename'
          • 更新节点的'value'字段为提取的file_name
          • 更新teak字典,记录file_pathfile_name
      • 处理变量节点:
        • 如果节点的id包含VariableNode
          • 提取name字段作为变量名称variables
          • 提取value字段作为变量值variable_value
          • 初始化变量列表variables_list和变量值列表variables_value_list:
            • 如果teak[nd['id']]['variables']不存在,则初始化为空列表。
            • 如果teak[nd['id']]['variable_value']不存在,则初始化为空列表。
          • 将变量名称添加到variables_list中。
          • 将变量值添加到variables_value_list中。
  3. 返回调整字典
    • 返回包含处理后的节点数据的teak字典。
说明
  • 文件输入节点:处理文件上传节点,提取并解码文件名,用于后续的文件处理和存储。
  • 变量节点:处理变量输入节点,提取变量名称和值,便于在后续流程中使用这些变量。

总结

主要职责

client.py中的这些函数主要负责以下几个方面:

  1. 处理和生成响应
    • process_graph: 处理聊天输入,通过LangChain对象生成响应和中间步骤。
    • extract_answer_keys: 从回答中提取关键词,辅助后续的文档召回和分析。
    • judge_source: 判断回答的来源,确保响应的可靠性和可追溯性。
  2. 记录和存储数据
    • process_source_document: 处理来源文档,将相关信息存储到数据库中,支持后续的召回和分析。
    • process_node_data: 处理节点数据,提取文件名和变量信息,为后续流程提供支持。
  3. 辅助功能
    • 提供流式选项设置、关键词提取的备用方案(如jieba库)、文档去重和元数据处理等功能,确保系统的健壮性和灵活性。

设计模式和架构

  • 异步编程:利用asyncio和异步函数确保高效的I/O操作和任务调度,适用于高并发的聊天应用。
  • 模块化设计:将不同的功能拆分为独立的函数和模块,增强代码的可维护性和可扩展性。
  • 日志记录:通过logger记录重要的操作和异常信息,便于调试和监控系统运行状态。
  • 错误处理:在关键步骤加入异常捕获和处理,确保系统在发生错误时能够优雅地处理并反馈给用户。

与其他组件的交互

  • LLMService:与大型语言模型服务交互,获取用于关键词提取和答案生成的LLM实例。
  • 数据库:通过session_getter获取数据库会话,存储和检索聊天历史、召回文档等数据。
  • LangChain:利用LangChain库的LLMChainDocument对象,处理复杂的对话逻辑和文档处理。
  • WebSocket:通过WebSocket连接与客户端进行实时通信,发送和接收消息。

功能流程

  1. 接收消息
    • 客户端通过WebSocket发送消息,handle_message方法接收并处理这些消息。
  2. 处理消息
    • 根据消息类型和工作类型,调用相应的处理函数(如GPTS助手)。
    • 通过process_graph函数生成响应,并记录中间步骤和来源信息。
  3. 生成响应
    • 使用LLM生成回答,提取关键词,并通过judge_source函数确定答案的来源。
    • 将结果存储到数据库中,并通过WebSocket将响应发送回客户端。
  4. 流式输出和异常处理
    • 管理流式输出内容,通过stream_queue确保实时响应。
    • 处理异常情况,确保系统的稳定性和用户体验。
关键字:企业公司网页设计方案_广州建设厅官网_线上培训机构排名前十_广告公司名字

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: