本文最后更新于61 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com
llm_service.py
#!/usr/bin/env python3
"""
LLM服务模块
提供统一的LLM调用接口,替代conversation_core.py中的get_response方法
"""
import logging
import sys
import os
from typing import Optional, Dict, Any, List
'''
类型提示(第8行):
Optional:表示这个变量可能有值,也可能是None(空)
Dict:字典类型(键值对)
Any:任意类型
List:列表类型
'''
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from nagaagent_core.core import AsyncOpenAI
from nagaagent_core.api import FastAPI, HTTPException
from system.config import config
'''
自定义模块:
AsyncOpenAI:异步的OpenAI客户端(用来和AI对话)
FastAPI:创建Web API的工具
HTTPException:HTTP异常处理
config:配置文件,存储API密钥等设置
'''
# 配置日志
logger = logging.getLogger("LLMService")
class LLMService:
"""LLM服务类 - 提供统一的LLM调用接口"""
def __init__(self):
self.async_client: Optional[AsyncOpenAI] = None #创建了一个变量 async_client,初始值是None
self._initialize_client() #调用 _initialize_client() 方法来初始化客户端
def _initialize_client(self):
"""初始化OpenAI客户端"""
try:
self.async_client = AsyncOpenAI(
api_key=config.api.api_key,
base_url=config.api.base_url.rstrip('/') + '/'
)
logger.info("LLM服务客户端初始化成功")
except Exception as e:
logger.error(f"LLM服务客户端初始化失败: {e}")
self.async_client = None
'''
这个方法的逻辑:
尝试创建OpenAI客户端
如果成功:记录成功日志
如果失败:记录错误日志,把客户端设为None
'''
# ====== 4个主要的方法=======
'''
方法名 | 作用 | 特点
get_response | 获取AI的回复 | 最简单的,只发一个问题
chat_with_context | 带上下文的聊天 | 可以记住之前的对话
stream_chat_with_context | 流式聊天 | 一个字一个字地返回,像打字一样
is_available | 检查服务是否可用 | 返回True或False
'''
async def get_response(self, prompt: str, temperature: float = 0.7) -> str:
"""为其他模块提供API调用接口"""
#这是一个双重检查:
if not self.async_client: # 如果客户端不存在
self._initialize_client() # 尝试初始化
if not self.async_client: # 如果还是不存在
return f"LLM服务不可用: 客户端初始化失败" # 返回错误信息
try:
response = await self.async_client.chat.completions.create(
model=config.api.model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=config.api.max_tokens
)
return response.choices[0].message.content
except RuntimeError as e:
if "handler is closed" in str(e): # 如果是连接关闭的错误
logger.debug(f"忽略连接关闭异常,重新创建客户端: {e}")
# 重新创建客户端并重试
self._initialize_client()
if self.async_client: # 如果重新创建成功
# 重新尝试请求
response = await self.async_client.chat.completions.create(
model=config.api.model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=config.api.max_tokens
)
return response.choices[0].message.content
else: # 如果重新创建失败
return f"LLM服务不可用: 重连失败"
else:
logger.error(f"API调用失败: {e}")
return f"API调用出错: {str(e)}"
except Exception as e:
logger.error(f"API调用失败: {e}")
return f"API调用出错: {str(e)}"
def is_available(self) -> bool:
"""检查LLM服务是否可用"""
return self.async_client is not None
async def chat_with_context(self, messages: List[Dict], temperature: float = 0.7) -> str:
"""带上下文的聊天调用"""
if not self.async_client:
self._initialize_client()
if not self.async_client:
return f"LLM服务不可用: 客户端初始化失败"
try:
response = await self.async_client.chat.completions.create(
model=config.api.model,
messages=messages,
temperature=temperature,
max_tokens=config.api.max_tokens
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"上下文聊天调用失败: {e}")
return f"聊天调用出错: {str(e)}"
#stream_chat_with_context:方法名,意思是"流式聊天带上下文"
#messages:参数1,聊天记录列表(包含之前的所有对话)
#temperature:参数2,温度值(控制AI回答的随机性),默认0.7
async def stream_chat_with_context(self, messages: List[Dict], temperature: float = 0.7):
"""带上下文的流式聊天调用"""
#检查客户端是否可用
if not self.async_client: #如果客户端不存在
self._initialize_client() #尝试初始化客户端
if not self.async_client: #再次检查:如果初始化后还是不存在
yield f"LLM服务不可用: 客户端初始化失败" #返回错误信息
return
try:
import aiohttp #导入HTTP客户端库
timeout = aiohttp.ClientTimeout(total=180, connect=60, sock_read=120)
async with aiohttp.ClientSession(timeout=timeout) as session: #创建一个HTTP会话,async with 会自动管理资源,结束时自动关闭
async with session.post( #发送POST请求到AI服务,请求地址是:基础URL/chat/completions
f"{config.api.base_url}/chat/completions",
headers={ #headers={...}:请求头,包含:
"Authorization": f"Bearer {config.api.api_key}", #认证信息(API密钥)
"Content-Type": "application/json", #内容类型是JSON
"Accept": "text/event-stream", #接受事件流格式(流式传输)
"Connection": "keep-alive" #保持连接
},
json={ #请求体,包含:
"model": config.api.model, #使用哪个AI模型
"messages": messages, #聊天记录
"temperature": temperature, #温度值
"max_tokens": config.api.max_tokens, #最大令牌数
"stream": True #开启流式传输
}
) as resp:
if resp.status != 200: #获取HTTP状态码(200表示成功),如果不是200,返回错误信息并结束
yield f"LLM API调用失败 (状态码: {resp.status})"
return
# ======处理流式响应==========
'''
代码层级 | 代码行 | 作用 | 解释
第1层 |async for chunk in resp.content.iter_chunked(1024): | 读取数据块 | 每次读取1024字节的数据
第2层 | if not chunk: break | 检查数据块 | 如果数据块为空,停止循环
第3层 | try: | 尝试解码 | 开始尝试处理数据
第4层 | data = chunk.decode('utf-8') | 解码数据 | 把字节数据转成字符串
第4层 | lines = data.split('\n') | 分割成行 | 按换行符分割
第5层 | for line in lines: | 遍历每一行 | 处理每一行数据
第6层 | line = line.strip() | 去除空格 | 去掉开头结尾的空格
第7层 | if line.startswith('data: '): | 检查数据行 | 只处理以"data: "开头的行
第8层 | data_str = line[6:] | 提取数据 | 去掉前6个字符("data: ")
第9层 | if data_str == '[DONE]': return | 检查结束标志 | 如果是"[DONE]",结束方法
第10层 | try: | 尝试解析JSON | 开始解析JSON数据
第11层 | import json | 导入json模块 | 用于解析JSON
第12层 | data = json.loads(data_str) | 解析JSON | 把字符串转成JSON对象
第13层 | if 'choices' in data and len(data['choices']) > 0: | 检查choices | 确保有choices字段且不为空
第14层 | delta = data['choices'][0].get('delta', {}) | 获取delta | 获取第一个choice的delta部分
第15层 | if 'content' in delta: | 检查是否有内容 | delta中是否有content字段
第16层 | import base64 | 导入base64模块| 用于base64编码
第17层 | content = delta['content'] | 获取内容 | 获取AI返回的文本内容
第18层 | b64 = base64.b64encode(content.encode('utf-8')).decode('ascii') | base64编码 | 把内容转成base64格式
第19层 | yield f"data: {b64}\n\n" | 返回数据 | 以特定格式返回编码后的内容
'''
async for chunk in resp.content.iter_chunked(1024):
if not chunk:
break
try:
data = chunk.decode('utf-8')
lines = data.split('\n')
for line in lines:
line = line.strip()
if line.startswith('data: '):
data_str = line[6:]
if data_str == '[DONE]':
return
try:
import json
data = json.loads(data_str)
if 'choices' in data and len(data['choices']) > 0:
delta = data['choices'][0].get('delta', {})
if 'content' in delta:
import base64
content = delta['content']
b64 = base64.b64encode(content.encode('utf-8')).decode('ascii')
yield f"data: {b64}\n\n"
#异常处理
except json.JSONDecodeError:
continue #如果JSON解析失败,跳过这行继续
except UnicodeDecodeError:
continue #如果字符串解码失败,跳过这个数据块继续
#整体异常处理
except Exception as e: #如果上面所有代码出现任何错误
logger.error(f"流式聊天调用失败: {e}") #记录错误日志
yield f"data: 流式调用出错: {str(e)}\n\n" #返回错误信息
# 全局LLM服务实例
_llm_service: Optional[LLMService] = None
def get_llm_service() -> LLMService:
"""获取全局LLM服务实例"""
global _llm_service
if _llm_service is None:
_llm_service = LLMService()
return _llm_service
'''
这里用了一个设计模式叫“单例模式”:
整个程序只有一个LLMService实例
如果还没有创建,就创建一个
如果已经创建了,就直接用那个
'''
# ========Web API接口 ==========
# 创建独立的LLM服务API
llm_app = FastAPI(
title="LLM Service API",
description="LLM服务API",
version="1.0.0"
)
@llm_app.post("/llm/chat")
async def llm_chat(request: Dict[str, Any]):
"""LLM聊天接口 - 为其他模块提供LLM调用服务"""
try:
prompt = request.get("prompt", "")
temperature = request.get("temperature", 0.7)
if not prompt:
raise HTTPException(status_code=400, detail="prompt参数不能为空")
llm_service = get_llm_service()
response = await llm_service.get_response(prompt, temperature)
return {
"status": "success",
"response": response,
"temperature": temperature
}
except Exception as e:
logger.error(f"LLM聊天接口异常: {e}")
raise HTTPException(status_code=500, detail=f"LLM服务异常: {str(e)}")
start_server.py
#!/usr/bin/env python3
"""
NagaAgent 服务启动脚本
服务启动脚本,就像是一个"总开关",负责启动整个系统。
支持启动API服务器和LLM服务
"""
import asyncio
import sys
import os
import argparse #命令行参数解析(让程序可以从命令行接收参数)
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent #.parent.parent:父目录的父目录(项目根目录)
sys.path.insert(0, str(project_root)) #insert(0, ...):插入到列表最前面(优先级最高)
#uvicorn:一个Web服务器,专门运行FastAPI应用,就像Apache或Nginx,但是专门为Python异步应用设计的
from nagaagent_core.api import uvicorn
async def start_api_server():
"""启动API服务器"""
#从apiserver.api_server模块导入app对象,这个app应该是一个FastAPI应用实例
from apiserver.api_server import app
# 从环境变量获取配置,回退到config
#os.getenv("变量名", "默认值"):从环境变量获取值
host = os.getenv("API_SERVER_HOST", "127.0.0.1")#如果环境变量API_SERVER_HOST存在,就用它的值
try:
from system.config import get_server_port #尝试:从system.config导入get_server_port函数
default_port = get_server_port("api_server") #如果导入成功:调用get_server_port("api_server")获取端口
except ImportError:
default_port = 8000 #如果导入失败(except ImportError):用默认端口8000
#从环境变量API_SERVER_PORT获取端口,如果不存在,用上面得到的default_port,int(...):把字符串转成整数
port = int(os.getenv("API_SERVER_PORT", str(default_port)))
#获取环境变量API_SERVER_RELOAD,默认值是字符串"False",.lower():转成小写(比如"False"→"false"),== "true":判断是否等于"true"
#结果是布尔值:True或False
reload = os.getenv("API_SERVER_RELOAD", "False").lower() == "true"
print(f"启动NagaAgent API服务器...")
print(f"地址: http://{host}:{port}")
print(f"文档: http://{host}:{port}/docs")
print(f"自动重载: {'开启' if reload else '关闭'}")
'''
这里用了f-string格式化:
f"地址: http://{host}:{port}":把host和port变量的值插入到字符串中
{'开启' if reload else '关闭'}:三元表达式
如果reload为True,显示"开启"
如果reload为False,显示"关闭"
'''
# 启动服务器
uvicorn.run(
"apiserver.api_server:app",
host=host,
port=port,
reload=reload, #是否开启自动重载(修改代码后自动重启)
log_level="info",
ws_ping_interval=None,
ws_ping_timeout=None
)
async def start_llm_service():
"""启动LLM服务"""
from apiserver.llm_service import llm_app
# 从环境变量获取配置,回退到config
host = os.getenv("LLM_SERVICE_HOST", "127.0.0.1")
try:
from system.config import get_server_port
#这里传的参数是"agent_server",不是"llm_service",这是配置文件中的命名
default_port = get_server_port("agent_server")
except ImportError:
default_port = 8001
port = int(os.getenv("LLM_SERVICE_PORT", str(default_port)))
reload = os.getenv("LLM_SERVICE_RELOAD", "False").lower() == "true"
print(f"启动LLM服务...")
print(f"地址: http://{host}:{port}")
print(f"文档: http://{host}:{port}/docs")
print(f"自动重载: {'开启' if reload else '关闭'}")
# 启动服务器
uvicorn.run(
"apiserver.llm_service:llm_app",
host=host,
port=port,
reload=reload,
log_level="info",
ws_ping_interval=None,
ws_ping_timeout=None
)
async def main():
"""主函数"""
#创建参数解析器
parser = argparse.ArgumentParser(description="NagaAgent 服务启动器")
parser.add_argument("service", choices=["api", "llm", "both"],
help="要启动的服务: api(API服务器), llm(LLM服务), both(两个都启动)")
#解析参数
args = parser.parse_args()
#根据参数执行不同的分支
#启动API服务器
if args.service == "api":
await start_api_server()
#启动LLM服务
elif args.service == "llm":
await start_llm_service()
#启动两个服务
elif args.service == "both":
print("启动所有服务...")
print("注意: 同时启动多个服务需要不同的端口配置")
try:
from system.config import get_server_port
api_port = get_server_port("api_server")
agent_port = get_server_port("agent_server")
print(f"API服务器: http://127.0.0.1:{api_port}")
print(f"LLM服务: http://127.0.0.1:{agent_port}")
except ImportError:
print("API服务器: http://127.0.0.1:8000")
print("LLM服务: http://127.0.0.1:8001")
# 这里可以实现同时启动多个服务的逻辑
# 目前先启动API服务器
await start_api_server()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt: #捕获键盘中断(比如按Ctrl+C)
print("\n收到停止信号,正在关闭服务...")
except Exception as e:
print(f"启动失败: {e}")
sys.exit(1)
# ========整个流程总结=======
'''
用户运行脚本 → 解析命令行参数 → 根据参数选择启动的服务
↓
api参数 → 启动API服务器
↓
llm参数 → 启动LLM服务
↓
both参数 → 显示信息,但只启动API服务器(有bug)
'''
# =======为什么在函数内部导入模块?=======
#1. 避免循环导入
'''
想象一下这样的场景:
文件A需要导入文件B的某个东西
文件B也需要导入文件A的某个东西
这样就会形成一个"死循环",Python不知道先加载哪个
例子:
# 文件A.py
import B # 导入B
def func_a():
return "A"
# 文件B.py
import A # 导入A
def func_b():
return "B"
这样两个文件互相导入,Python会报错。
解决方法:在函数内部导入
# 文件B.py
def func_b():
from A import func_a # 在函数内部导入
return "B"
'''
#2. 延迟加载,提高启动速度
'''
对比一下两种方式:
方式一:在顶部全部导入
import heavy_module1 # 这个模块很大,加载要5秒
import heavy_module2 # 这个模块也很大,加载要5秒
import heavy_module3 # 这个模块也很大,加载要5秒
def do_something():
# 实际只用了heavy_module1
return heavy_module1.func()
方式二:在函数内部按需导入
def do_something():
import heavy_module1 # 只有调用这个函数时才加载
return heavy_module1.func()
区别:
方式一:程序一启动就要加载所有模块(15秒)
方式二:只有调用函数时才加载需要的模块(5秒)
'''
#3. 条件导入(根据环境选择不同的模块)
'''
def process_image():
# 根据操作系统选择不同的图像处理库
import platform
system = platform.system()
if system == "Windows":
import windows_image_lib as image_lib
elif system == "Linux":
import linux_image_lib as image_lib
elif system == "Darwin": # macOS
import mac_image_lib as image_lib
else:
raise Exception("不支持的操作系统")
return image_lib.process()
'''
#4. 避免不必要的依赖
'''
假设你的程序有多个功能,但不是每个用户都需要所有功能:
def export_to_excel():
# 只有当用户需要导出Excel时才导入pandas
import pandas as pd
# ... 导出代码
def export_to_pdf():
# 只有当用户需要导出PDF时才导入reportlab
from reportlab.lib.pagesizes import letter
# ... 导出代码
'''
#具体到这个代码的分析
'''
在代码中:
async def start_api_server():
"""启动API服务器"""
from apiserver.api_server import app # ← 在这里导入
为什么这么做?
1.如果start_api_server()不被调用,apiserver.api_server模块就不会被加载
节省内存
加快程序启动
2.避免可能的循环导入
apiserver.api_server可能又导入了当前模块的某些东西
延迟导入可以打破循环
'''
#再看另一个例子:
'''
async def stream_chat_with_context(self, messages: List[Dict], temperature: float = 0.7):
try:
import aiohttp # ← 在这里导入
timeout = aiohttp.ClientTimeout(total=180, connect=60, sock_read=120)
# 后面还有
import json # ← 在try块里面导入
import base64 # ← 在try块里面导入
为什么这里也这么做?
1.aiohttp可能不是所有用户都需要的
只有使用流式聊天功能时才需要
如果用户从不使用这个功能,就不会安装aiohttp,程序也能正常运行
2.异常处理中导入json和base64
try:
import json
data = json.loads(data_str)
except json.JSONDecodeError:
continue
这其实是一个小问题:json是Python标准库,应该在最上面导入
这里可能是为了代码结构清晰,把相关的导入放在一起
'''
streaming_tool_extractor.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
流式文本切割器
负责将LLM流式输出按句切割并发送给语音集成(TTS)。不再检测或处理工具调用。
"""
import re
import json
import logging
import asyncio
import sys
import os
from typing import Callable, Optional, Dict, Any, Union, List #类型提示,帮助IDE和开发者理解代码类型
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
#os.path.abspath(__file__):转为绝对路径,os.path.dirname(...):获取父目录(用两次得到项目根目录),sys.path.insert(0, ...):插入到Python搜索路径的最前面
try:
from system.config import config, AI_NAME # 导入配置系统
except ImportError:
# 如果直接导入失败,尝试从父目录导入
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from system.config import config, AI_NAME
# 工具调用解析/执行已不再需要
logger = logging.getLogger("StreamingToolCallExtractor")
class CallbackManager:
"""回调函数管理器 - 统一处理同步/异步回调"""
def __init__(self):
self.callbacks = {} #self.callbacks:存储回调函数
#self.callback_types:存储回调函数的类型(是同步还是异步)
self.callback_types = {} # 缓存回调函数类型
def register_callback(self, name: str, callback: Optional[Callable]): #name: str:回调函数的名字,callback: Optional[Callable]:回调函数本身,可以是None
"""注册回调函数"""
self.callbacks[name] = callback #把回调函数存到字典里
if callback: #如果回调函数不是None
# 缓存函数类型,避免重复检查
self.callback_types[name] = asyncio.iscoroutinefunction(callback) #如果回调函数不是None
else:
self.callback_types[name] = False
async def call_callback(self, name: str, *args, **kwargs):
#*args:可变位置参数(可以传任意数量的位置参数)
#**kwargs:可变关键字参数(可以传任意数量的关键字参数)
"""统一调用回调函数"""
callback = self.callbacks.get(name)
if not callback:
return None
try: #从缓存中获取这个函数是否是异步的,如果是异步的(返回True),就用await调用,如果是同步的(返回False),就直接调用
if self.callback_types.get(name, False):
# 异步回调
return await callback(*args, **kwargs)
else:
# 同步回调
return callback(*args, **kwargs)
except Exception as e:
logger.error(f"回调函数 {name} 执行错误: {e}")
return None
#为什么要这样区分?,异步函数必须用await调用,否则不会执行,同步函数不能用await调用,否则会报错,这个管理器统一处理了这两种情况
class StreamingToolCallExtractor:
"""流式文本切割器 - 实时按句切割并发送给TTS,支持工具调用处理"""
def __init__(self, mcp_manager=None):
self.mcp_manager = mcp_manager #MCP管理器(是其他模块的引用)
self.text_buffer = "" # 普通文本缓冲区,(临时存放正在处理的文本)
self.complete_text = "" # 完整文本内容,(存放所有处理过的文本)
self.sentence_endings = r"[。?!;\.\?\!\;]" # 断句标点
# 使用回调管理器
self.callback_manager = CallbackManager() #创建回调管理器实例
# 语音集成(可选)(初始为空)
self.voice_integration = None
# 工具调用功能已移除
self.tool_calls_queue = None
#设置回调函数
def set_callbacks(self,
on_text_chunk: Optional[Callable] = None,
voice_integration=None):
"""设置回调函数"""
# 注册回调函数(仅文本块)
self.callback_manager.register_callback("text_chunk", on_text_chunk) #注册一个名为"text_chunk"的回调函数
self.voice_integration = voice_integration #设置语音集成接口
async def process_text_chunk(self, text_chunk: str):
"""
处理文本块,实时按句切割并发送给语音集成
处理流程:
1. 累积完整文本(用于最终保存)
2. 逐字符检查句子结束符
3. 遇到结束符时立即切割并发送完整句子到TTS
4. 保留未完成的句子部分继续累积
"""
if not text_chunk:
return None
# 调用文本块回调,将文本发送到前端
results = [] #创建一个空列表results存放结果
#调用"text_chunk"回调函数,传入两个参数:text_chunk和字符串"chunk"
result = await self.callback_manager.call_callback("text_chunk", text_chunk, "chunk")
if result: #如果有返回值,添加到results列表
results.append(result)
# 累积完整文本(用于最终保存到数据库)
self.complete_text += text_chunk
#把收到的文本块添加到完整文本中,就像把收到的信件放到文件夹里保存
# =====这是最复杂的部分,我们用简单的方式解释=====
#逻辑流程:
'''
收到文本块 → 遍历每个字符 → 添加到缓冲区 → 检查是否是结束符
↓
如果是结束符 → 切割句子 → 发送完整句子给TTS → 保留剩余部分
↓
返回处理结果
详细分解:
1.for char in text_chunk::遍历文本块中的每个字符
2.self.text_buffer += char:把字符添加到缓冲区
3.if re.search(self.sentence_endings, char)::检查这个字符是不是句子结束符
使用正则表达式匹配:[。?!;\.\?\!\;]
包括中文和英文的句号、问号、感叹号、分号
4.如果遇到结束符:sentences = re.split(self.sentence_endings, self.text_buffer)
用结束符分割整个缓冲区
比如:"你好。今天天气" → ["你好", "今天天气"]
5.if len(sentences) > 1::如果分割出多个部分
complete_sentence = sentences[0] + char
第一个部分 + 结束符 = 完整句子
"你好" + "。" = "你好。"
6.if complete_sentence.strip()::如果句子不是空的
self._send_to_voice_integration(complete_sentence)
保留第一个完整句子之后的部分
比如:"今天天气"留在缓冲区等待更多字符
'''
# 实时按句切割并发送到TTS
for char in text_chunk:
self.text_buffer += char
# 检查是否遇到句子结束符(。?!;等)
if re.search(self.sentence_endings, char):
# 立即切割并发送完整句子到TTS
sentences = re.split(self.sentence_endings, self.text_buffer)
if len(sentences) > 1:
complete_sentence = sentences[0] + char
if complete_sentence.strip():
# 立即发送到语音集成进行TTS合成(不阻塞文本流)
self._send_to_voice_integration(complete_sentence)
# 保留未完成的句子部分,继续累积
remaining_sentences = [s for s in sentences[1:] if s.strip()]
self.text_buffer = "".join(remaining_sentences)
return results if results else None
#刷新文本缓冲区,当流式传输结束时,发送缓冲区中剩余的文本
async def _flush_text_buffer(self):
"""刷新文本缓冲区 - 处理流式结束时的剩余文本"""
if self.text_buffer:
# 立即发送剩余的未完成句子到语音集成(不阻塞)
self._send_to_voice_integration(self.text_buffer)
self.text_buffer = ""
return None
return None
#发送到语音集成
def _send_to_voice_integration(self, text: str):
"""发送文本到语音集成(不阻塞文本流)"""
if self.voice_integration:
try:
# 在独立线程中处理TTS,不阻塞文本流
import threading #导入线程模块
threading.Thread( #创建并启动一个新线程
target=self.voice_integration.receive_text_chunk, #线程要执行的函数
args=(text,), #传给函数的参数
daemon=True #设置为守护线程(主程序退出时自动结束)
).start()
except Exception as e:
logger.error(f"发送到语音集成失败: {e}")
#为什么用线程?TTS(文本转语音)可能需要较长时间,如果用主线程执行,会阻塞后续文本的处理,用新线程执行,可以"边说话边接收新文本"
# 工具调用相关方法已移除,功能已迁移到background_analyzer
#完成处理,结束处理流程,清理缓冲区
async def finish_processing(self):
"""完成处理,清理剩余内容"""
results = []
# 处理剩余的文本
if self.text_buffer:
result = await self._flush_text_buffer()
if result:
results.append(result)
return results if results else None
def get_complete_text(self) -> str:
"""获取完整文本内容"""
return self.complete_text
def reset(self):
"""重置提取器状态"""
self.text_buffer = ""
self.complete_text = ""
async def process_streaming_response(self, llm_service, messages: List[Dict],
temperature: float = 0.7, voice_integration=None):
'''
参数1:llm_service - LLM服务实例
参数2:messages - 聊天消息列表
参数3:temperature - 温度参数(默认0.7)
参数4:voice_integration - 语音集成接口
'''
"""处理流式响应,整合LLM调用和TTS处理,(核心集成方法)"""
if voice_integration:
self.voice_integration = voice_integration
async for chunk in llm_service.stream_chat_with_context(messages, temperature):
if chunk.startswith("data: "):
# 解码base64内容
try:
import base64
data_str = chunk[6:].strip() #去掉前6个字符"data: "
if data_str == '[DONE]':
break #如果是"[DONE]"表示流结束
decoded = base64.b64decode(data_str).decode('utf-8') #否则解码base64,处理文本
await self.process_text_chunk(decoded)
except Exception as e:
logger.error(f"处理流式响应块失败: {e}")
continue
else:
# 直接处理文本内容
await self.process_text_chunk(chunk)
# 完成处理
await self.finish_processing()
return self.get_complete_text()
'''
开始处理流式响应
↓
遍历每个数据块
↓
↓---是base64格式---解码---处理文本
↓
↓---是普通文本---直接处理文本
↓
切割句子,发送给TTS
↓
累积到完整文本
↓
流结束→清理缓冲区→返回完整文本
'''
# ======为什么有异步函数还要用多线程?=========
#Python并发编程中的一个重要话题
#简单回答:异步和多线程解决不同的问题,有时候需要组合使用。
'''
详细对比:
特性 | 异步 (asyncio) | 多线程 (threading)
核心思想 | 单线程并发,任务切换 | 多线程并行,同时运行
适合场景 | I/O密集型任务(网络、文件) | I/O密集型,某些CPU密集型
资源消耗 | 轻量(一个线程) | 较重(每个线程有独立栈)
编程难度 | 中等(需要理解async/await)| 较高(需要处理线程安全)
Python限制| 受GIL影响较小 | 受GIL限制(CPU密集型不高效)
为什么在这个代码中要同时使用?
看这个代码片段:
def _send_to_voice_integration(self, text: str):
if self.voice_integration:
try:
# 在独立线程中处理TTS,不阻塞文本流
import threading
threading.Thread(
target=self.voice_integration.receive_text_chunk,
args=(text,),
daemon=True
).start()
except Exception as e:
logger.error(f"发送到语音集成失败: {e}")
可能的原因:
原因1:TTS库可能是同步的,(很多语音合成库是同步的)
# 假设TTS库是这样的(不支持异步)
class 同步TTS库:
def receive_text_chunk(self, text):
# 这是一个同步方法,可能需要几秒钟
time.sleep(3) # 模拟耗时操作
print(f"播放: {text}")
如果在异步函数中直接调用同步的TTS:
async def process_text_chunk(self, text):
# 如果直接调用同步TTS
self.voice_integration.receive_text_chunk(text) # 这会阻塞3秒!
# 在这3秒内,整个异步事件循环都被阻塞了
原因2:避免阻塞事件循环
异步编程有一个黄金法则:不要阻塞事件循环
\# 不好的做法:在异步函数中调用耗时同步函数
async def bad_example():
await 异步操作1()
耗时同步函数() # 这会阻塞整个事件循环!
await 异步操作2() # 要等上面完成才能执行
原因3:TTS是CPU密集型(生成语音需要大量计算)
# 好的做法:把耗时同步函数放到线程中
async def good_example():
await 异步操作1()
await asyncio.to_thread(耗时同步函数) # 在后台线程执行
await 异步操作2() # 可以立即执行,不用等待
异步和多线程的配合在这个具体代码中的应用:
async def process_streaming_response(self, ...):
# 异步接收AI的流式响应
async for chunk in llm_service.stream_chat_with_context(...):
# 异步处理文本
await self.process_text_chunk(chunk)
# 文本处理是异步的,但TTS是同步的
# 所以TTS放在线程中,不阻塞文本接收
'''
# ========GIL是什么?(全局解释器锁)========
'''
简单理解:
想象Python解释器是一个只能一个人进去的厕所。
厕所 = Python解释器
锁 = GIL(门锁)
规则:每次只能有一个人进去上厕所(执行Python代码)
为什么要有这个锁?
因为Python设计得比较早......6
后果是即使你有多个线程,但因为有GIL,一次还是只能有一行Python代码。
当然也是有例外的:当线程在等待时(比如等待网络数据),它会暂时把锁让出来,让别人进去。
'''
#多线程受GIL限制,异步需要Python 3.5+
#怎么绕过GIL?
'''
用多进程(Multiprocessing)
每个进程有自己的Python解释器
每个进程有自己的GIL
可以真正并行
用异步(Asyncio)
一个线程内切换任务
适合I/O密集型
用其他语言写的扩展
比如用C写的numpy,在执行C代码时释放GIL
'''
# =========什么是I/O密集型和CPU密集型?==========
'''
I/O密集型(Input/Output):
大部分时间在等待,比如:
从网络下载文件
从数据库读取数据
从硬盘读取文件
等待用户输入
特点:CPU大部分时间在等,没事做
CPU密集型:
大部分时间在计算,比如:
视频编码/解码
图像处理
科学计算
加密解密
特点:CPU一直很忙
'''
# =====协程(Coroutine)======
#从零开始解释什么是协程?
#第一步:先理解函数
#普通函数就像一个一次性任务清单:
'''
def 做早饭():
1. 煎蛋
2. 烤面包
3. 倒牛奶
return "早餐完成" # 结束了,不能中途暂停
'''
#特点:开始执行 → 一直执行到结束 → 返回结果,不能中途暂停,不能恢复
#第二步:理解生成器(Generator)
#协程的前身是生成器。生成器就像一个可暂停的任务清单:
'''
def 做复杂的早饭():
print("开始煎蛋")
yield "煎蛋完成" # 暂停在这里,返回结果
print("开始烤面包")
yield "烤面包完成" # 暂停在这里,返回结果
print("开始倒牛奶")
yield "早餐完成" # 最后暂停
# 使用
早餐机 = 做复杂的早饭()
print(next(早餐机)) # 输出:开始煎蛋 \n 煎蛋完成
print(next(早餐机)) # 输出:开始烤面包 \n 烤面包完成
print(next(早餐机)) # 输出:开始倒牛奶 \n 早餐完成
'''
#生成器的特点:可以暂停(yield),可以恢复(next()),可以传值进来(通过send())
#第三步:协程是升级版的生成器
#协程就像一个可以双向通信的智能任务清单:
'''
async def 智能做早饭():
材料 = await 获取材料() # 等待别人送材料来
print(f"用{材料}煎蛋")
温度 = await 检查温度() # 等待温度计读数
print(f"在{温度}度下烤面包")
return "早餐完成"
'''
#协程的特点:可以暂停(await),可以恢复(自动),可以和其他协程协作(通过事件循环)
# =======Python中的协程进化史=========
'''
Python 2.x: 只有生成器(yield) ← 可以暂停,但不好用
Python 3.3: yield from ← 改进生成器
Python 3.4: @asyncio.coroutine ← 最早的协程装饰器
Python 3.5: async/await ← 现在的方式(最简单)
'''
#协程的具体工作原理
#协程的三个状态:准备中(CREATED):协程对象创建了,但还没开始,运行中(RUNNING):正在执行,暂停中(SUSPENDED):遇到await暂停了
#协程的生命周期:
'''
创建协程 → 放入事件循环 → 开始执行 → 遇到await暂停
↓
事件循环执行其他任务
↓
await的条件满足了
↓
恢复执行 → 完成/异常
'''
#协程的核心:事件循环(Event Loop)
'''
事件循环就像一个餐厅经理:
管理所有协程(厨师)
哪个协程可以执行了(哪个锅可以炒了)
哪个协程需要等待(哪个菜还在切)
'''
#事件循环的工作流程:
'''
开始
↓
检查协程队列
↓
有就绪的协程吗? → 有 → 执行它
↓ 没有
等待事件发生(I/O完成、定时器到点)
↓
事件发生 → 对应的协程就绪
↓
继续检查...
'''
# =======协程怎么那么像异步啊?=========
#协程就是异步,或者说协程是实现异步的一种具体技术.......6
#异步 是一个 目标(想要同时做多件事),协程 是 实现这个目标的工具(具体怎么做)\
#就像:我要从北京到上海(异步:想更快到达),我可以选择坐高铁、飞机、开车(协程:具体的方法)
#更准确的层次关系:
'''
编程模式层面:
异步编程(Asynchronous Programming)
↓
具体实现层面:
Python:协程(Coroutine)← 这是Python实现异步的方式
JavaScript:Promise/async-await ← 这是JS实现异步的方式
Go:Goroutine ← 这是Go实现异步的方式
'''
#为什么会有这种混淆?
#因为不同语言用不同的词,但说的是一回事:由篇幅限制,这里就不再过多赘述了。但核心思想都一样:让程序在等待时可以做其他事。
#所以到这需要对前面的观点进行一些修复,画一个更准确的关系图:
'''
异步编程(概念)
│
├── 实现方式1:多线程/多进程
│
├── 实现方式2:事件驱动
│
└── 实现方式3:协程(Coroutine)← Python的选择
│
├── 实现方式:async/await语法
├── 运行时:asyncio事件循环
└── 特点:轻量级、单线程内并发
'''
#为什么Python选择协程?
'''
历史原因:
早期Python:有GIL限制,多线程效果不好
Python 3.4:引入asyncio库,用协程实现异步
Python 3.5:引入async/await语法,让协程更容易写
'''
#所以换一个角度理解:异步 是 "什么"(What):我想要我的程序不要傻等,我想要同时处理多个请求,我想要高效利用CPU时间
#协程 是 "怎么实现"(How):Python说:我们用async/await语法,我们写协程函数,我们用asyncio事件循环来调度
#到时候我们进行最后一步澄清:协程 ≠ 异步,更准确地说:协程是实现异步的一种具体技术手段。
#就像:飞机 ≠ 快速出行(飞机是实现快速出行的一种方式),协程 ≠ 异步编程(协程是实现异步编程的一种方式)
message_manager.py
#!/usr/bin/env python3
"""
统一的消息管理模块
支持多会话、多agent的消息存储和拼接
"""
# 异步编程工具(让程序可以同时做多件事情)
import asyncio
# 生成唯一ID的工具(像身份证号码生成器)
import uuid
# 日志记录工具(像日记本,记录程序做了什么)
import logging
# 正则表达式工具(文本搜索和匹配工具)
import re
# JSON数据处理工具(处理像{"name":"小明"}这样的数据格式)
import json
# 系统工具(访问命令行参数等)
import sys
# 时间工具
import time
# 类型提示(只是给程序员看的,告诉别人参数应该是什么类型)
from typing import Dict, List, Optional, Any
# 日期时间工具
from datetime import datetime, timedelta
# 路径处理工具(更方便地处理文件路径)
from pathlib import Path
logger = logging.getLogger(__name__)
# 工具函数
def now():
"""获取当前时间戳"""
return time.strftime('%H:%M:%S:') + str(int(time.time() * 1000) % 10000)
def setup_logging():
"""统一配置日志系统"""
try:
# 尝试从配置文件读取设置
from system.config import config
log_level = getattr(logging, config.system.log_level.upper(), logging.INFO)
# 配置日志的基本设置
logging.basicConfig(
level=log_level, # 日志级别
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # 日志格式
handlers=[logging.StreamHandler(sys.stderr)] # 输出到错误流
)
# 设置第三方库日志级别
for logger_name in ["httpcore.connection", "httpcore.http11", "httpx", "openai._base_client", "asyncio"]:
logging.getLogger(logger_name).setLevel(logging.WARNING)
except ImportError:
# 如果无法导入配置,使用默认设置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
class MessageManager:
"""统一的消息管理器"""
def __init__(self):
self.sessions: Dict[str, Dict] = {}
# 分析状态跟踪,防止重复执行
self.analysis_in_progress: Dict[str, bool] = {}
# 从配置文件读取最大历史轮数,默认为10轮
try:
from system.config import config
self.max_history_rounds = config.api.max_history_rounds # 最大对话轮数
self.max_messages_per_session = self.max_history_rounds * 2 # 每轮对话包含用户和助手各一条消息
self.persistent_context = config.api.persistent_context # 是否保存对话历史
self.context_load_days = config.api.context_load_days # 加载多少天的历史
self.log_dir = config.system.log_dir # 日志保存目录
self.ai_name = config.system.ai_name # AI的名字
except ImportError:
# 如果找不到配置,就用默认值
self.max_history_rounds = 10
self.max_messages_per_session = 20 # 默认20条消息
self.persistent_context = True
self.context_load_days = 3
self.log_dir = Path("logs")
self.ai_name = "娜迦"
logger.warning("无法导入配置,使用默认历史轮数设置")
def generate_session_id(self) -> str:
"""生成唯一的会话ID"""
return str(uuid.uuid4())
def create_session(self, session_id: Optional[str] = None) -> str:
"""获取或创建会话"""
if not session_id:
session_id = self.generate_session_id()
# 检查会话是否已存在
if session_id in self.sessions:
logger.debug(f"使用现有会话: {session_id}")
# 更新最后活动时间
self.sessions[session_id]["last_activity"] = asyncio.get_event_loop().time()
return session_id
# 初始化新会话
self.sessions[session_id] = {
"created_at": asyncio.get_event_loop().time(),
"messages": [], # 消息列表(空的)
"agent_type": "default", # 可以扩展支持不同agent类型
"last_activity": asyncio.get_event_loop().time() # 最后活动时间
}
'''
逻辑判断:
如果 没有提供session_id 就 生成一个新的
如果 session_id已经在sessions字典里 就 更新最后活动时间并返回
否则 创建新房间,记录创建时间、空的消息列表等
如果 persistent_context是True 就 尝试加载历史对话
'''
# 如果启用持久化上下文,尝试加载历史对话
if self.persistent_context:
self._load_persistent_context_for_session(session_id)
logger.info(f"创建新会话: {session_id}")
return session_id
def _load_persistent_context_for_session(self, session_id: str):
"""为指定会话加载持久化上下文"""
try:
# 加载历史对话
recent_messages = self.load_recent_context(
days=self.context_load_days,
max_messages=self.max_messages_per_session
)
if recent_messages:
self.sessions[session_id]["messages"] = recent_messages
logger.info(f"会话 {session_id} 加载了 {len(recent_messages)} 条历史对话")
else:
logger.debug(f"会话 {session_id} 未找到历史对话记录")
except Exception as e:
logger.warning(f"为会话 {session_id} 加载持久化上下文失败: {e}")
def get_session(self, session_id: str) -> Optional[Dict]:
"""获取会话信息"""
return self.sessions.get(session_id)
def add_message(self, session_id: str, role: str, content: str) -> bool:
"""向会话添加消息"""
if session_id not in self.sessions:
logger.warning(f"会话不存在: {session_id}")
return False
session = self.sessions[session_id]
session["messages"].append({"role": role, "content": content}) # 添加消息
session["last_activity"] = asyncio.get_event_loop().time() # 更新活动时间
# 限制消息数量,如果消息太多了,就删除最早的消息(只保留最近的)
if len(session["messages"]) > self.max_messages_per_session:
session["messages"] = session["messages"][-self.max_messages_per_session:]
logger.debug(f"会话 {session_id} 添加消息: {role} - {content[:50]}...")
return True
def get_messages(self, session_id: str) -> List[Dict]:
"""获取会话的所有消息"""
session = self.sessions.get(session_id) # :在消息字典里查找这个消息,如果找到消息:返回session["messages"](这个字典里的所有对话)
return session["messages"] if session else [] # 如果没找到,返回[](空列表)
def get_recent_messages(self, session_id: str, count: Optional[int] = None) -> List[Dict]:
"""获取会话的最近消息"""
if count is None: # # 如果没有指定要多少条
count = self.max_messages_per_session # 就用默认的最大消息数
messages = self.get_messages(session_id) # 先获取所有消息
# session_id:房间号,count:想要多少条最近的消息(可选,不填就用默认值)
return messages[-count:] if messages else []
# 这是最重要的功能之一
def build_conversation_messages(self, session_id: str, system_prompt: str,
current_message: str, include_history: bool = True) -> List[Dict]:
"""构建完整的对话消息列表"""
messages = [] # 准备一个空的消息包
# 添加当前时间信息到系统提示词
from datetime import datetime
current_time = datetime.now() # 获取当前时间
# 制作时间信息字符串
time_info = f"\n\n【当前时间信息】\n当前日期:{current_time.strftime('%Y年%m月%d日')}\n当前时间:{current_time.strftime('%H:%M:%S')}\n当前星期:{current_time.strftime('%A')}\n"
enhanced_system_prompt = system_prompt + time_info # 把时间信息加到系统提示词后面
# 添加系统提示词(这是第一条消息)
messages.append({"role": "system", "content": enhanced_system_prompt})
# 添加历史对话 (如果需要的话)
if include_history: # 默认是True,包含历史
recent_messages = self.get_recent_messages(session_id) # 获取最近的历史对话
messages.extend(recent_messages) # 把历史对话加到消息包里
# 添加当前用户消息 (这是最后一条消息)
messages.append({"role": "user", "content": current_message})
return messages # 返回完整的消息包
def build_conversation_messages_from_memory(self, memory_messages: List[Dict], system_prompt: str,
current_message: str, max_history_rounds: int = None) -> List[Dict]:
"""
从内存消息列表构建对话消息(用于UI界面)
Args:
memory_messages: 内存中的消息列表
system_prompt: 系统提示词
current_message: 当前用户消息
max_history_rounds: 最大历史轮数,默认使用配置值
Returns:
List[Dict]: 完整的对话消息列表
"""
messages = []
# 添加当前时间信息到系统提示词
from datetime import datetime
current_time = datetime.now()
time_info = f"\n\n【当前时间信息】\n当前日期:{current_time.strftime('%Y年%m月%d日')}\n当前时间:{current_time.strftime('%H:%M:%S')}\n当前星期:{current_time.strftime('%A')}\n"
enhanced_system_prompt = system_prompt + time_info
# 添加系统提示词
messages.append({"role": "system", "content": enhanced_system_prompt})
# 计算最大消息数量
if max_history_rounds is None:
max_history_rounds = self.max_history_rounds
max_messages = max_history_rounds * 2 # 每轮对话包含用户和助手各一条消息
# 添加历史对话(限制数量)
if memory_messages:
recent_messages = memory_messages[-max_messages:]
messages.extend(recent_messages)
# 添加当前用户消息
messages.append({"role": "user", "content": current_message})
return messages
def get_session_info(self, session_id: str) -> Optional[Dict]:
"""获取会话详细信息"""
session = self.sessions.get(session_id)
if not session:
return None
return {
"session_id": session_id,
"created_at": session["created_at"],
"last_activity": session["last_activity"],
"message_count": len(session["messages"]),
"conversation_rounds": len(session["messages"]) // 2, # 有趣的计算:len(session["messages"]) // 2:用整数除法计算轮数
"agent_type": session["agent_type"],
"max_history_rounds": self.max_history_rounds, # 添加最大历史轮数信息
"last_message": session["messages"][-1]["content"][:100] + "..." if session["messages"] else "无对话历史"
}
def get_all_sessions_info(self) -> Dict[str, Dict]:
"""获取所有会话信息"""
sessions_info = {} # 创建一个空字典来存放所有房间信息
# 遍历self.sessions字典中的每个房间
for session_id, session in self.sessions.items():
# 对每个房间,调用get_session_info获取详细信息
sessions_info[session_id] = self.get_session_info(session_id)
return sessions_info # 返回包含所有房间信息的字典
def delete_session(self, session_id: str) -> bool:
"""删除指定会话"""
if session_id in self.sessions: # 如果房间存在
del self.sessions[session_id]
删除这个房间
logger.info(f"删除会话: {session_id}")
return True
return False
def clear_all_sessions(self) -> int:
"""清空所有会话"""
count = len(self.sessions) # 先统计有多少个房间
self.sessions.clear() # 清空整个字典
logger.info(f"清空所有会话,共 {count} 个") # 记录清空了多少个
return count # 返回清空的数量
# 这是非常重要的功能,防止内存被不活动的房间占满:
def cleanup_old_sessions(self, max_age_hours: int = 24) -> int:
"""清理过期会话"""
current_time = asyncio.get_event_loop().time() # 获取当前时间
expired_sessions = [] # 创建一个列表来存放过期的房间号
# 第一遍:找出所有过期的房间
for session_id, session in self.sessions.items():
# 计算房间多久没活动了(当前时间 - 最后活动时间)
# 如果超过max_age_hours小时(默认24小时),就标记为过期
if current_time - session["last_activity"] > max_age_hours * 3600:
expired_sessions.append(session_id) # 添加到过期列表
# 第二遍:删除所有过期的房间
for session_id in expired_sessions:
del self.sessions[session_id] # 从字典中删除
# 如果有清理过期的房间,就记录日志
if expired_sessions:
logger.info(f"清理了 {len(expired_sessions)} 个过期会话")
return len(expired_sessions) # 返回清理了多少个
def set_agent_type(self, session_id: str, agent_type: str) -> bool:
"""设置会话的agent类型"""
if session_id in self.sessions: # 如果房间存在
self.sessions[session_id]["agent_type"] = agent_type # 设置类型
return True # 设置成功
return False # 房间不存在,设置失败
def get_agent_type(self, session_id: str) -> Optional[str]:
"""获取会话的agent类型"""
session = self.sessions.get(session_id) # 先找到房间
return session["agent_type"] if session else None # 如果有房间就返回类型,否则返回None
# ========== 日志解析功能 ==========
def _parse_log_line(self, line: str) -> Optional[tuple]:
"""
解析单行日志内容
Args:
line: 日志行内容
Returns:
tuple: (role, content) 或 None
"""
line = line.strip() # 1. 去掉首尾空白字符
if not line: # 2. 如果是空行
return None # 返回None
# 3.匹配格式:[时间] 用户: 内容 或 [时间] AI名称: 内容
pattern = r'^\[(\d{2}:\d{2}:\d{2})\] (用户|' + re.escape(self.ai_name) + r'): (.+)$'
match = re.match(pattern, line) # 4. 尝试匹配
if match: # 5. 如果匹配成功
time_str, speaker, content = match.groups() # 6. 提取三部分
if speaker == "用户": # 7. 判断说话者
role = "user" # 用户说的话
else:
role = "assistant" # AI说的话
return (role, content.strip()) # 8. 返回角色和内容
return None # 9. 匹配失败,返回None
def _is_message_start_line(self, line: str) -> bool:
"""
判断是否为消息开始行
Args:
line: 日志行内容
Returns:
bool: 是否为消息开始行
"""
line = line.strip() # strip():去掉字符串开头和结尾的空白字符(空格、制表符、换行符)
if not line: # not line:如果line是空字符串(长度为0)
return False # 空行就直接返回None,不处理
# 匹配格式:[时间] 用户: 或 [时间] AI名称:
pattern = r'^\[(\d{2}:\d{2}:\d{2})\] (用户|' + re.escape(self.ai_name) + r'):'
return bool(re.match(pattern, line))
def parse_log_file(self, log_file_path: str) -> List[Dict]:
"""
解析单个日志文件,提取对话内容
按照日志记录代码的格式:每轮对话包含用户消息和AI回复,用50个-分隔
Args:
log_file_path: 日志文件路径
Returns:
List[Dict]: 对话消息列表,格式为[{"role": "user/assistant", "content": "内容"}]
"""
messages = [] # 1. 准备空列表存放结果
try:
# 2. 打开并读取文件
with open(log_file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 以50个-分割对话轮次(按照日志记录代码的格式)
conversation_blocks = content.split('-' * 50)
for block in conversation_blocks:
block = block.strip() # 去掉空白
if not block: # 跳过空块
continue
# 解析每个对话块中的消息
block_messages = self._parse_conversation_block(block)
messages.extend(block_messages) # 6. 添加到结果列表
except FileNotFoundError:
logger.debug(f"日志文件不存在: {log_file_path}")
except Exception as e:
logger.error(f"解析日志文件失败 {log_file_path}: {e}")
return messages # 7. 返回所有消息
def _parse_conversation_block(self, block: str) -> List[Dict]:
"""
解析单个对话块,提取其中的所有消息
每块包含用户消息和AI回复,支持多行内容
Args:
block: 对话块内容
Returns:
List[Dict]: 消息列表
"""
messages = [] # 1. 存放解析出的消息
lines = block.split('\n') # 2. 按换行符分割成多行
current_message = None # 3. 当前正在处理的消息(None表示没有)
current_content_lines = [] # 4. 当前消息的内容行列表
# 5. 逐行处理
for line in lines:
line = line.rstrip('\n\r') # 6.移除行尾换行符,但保留内容中的换行
# 7.检查是否为消息开始行
if self._is_message_start_line(line):
# 8.保存前一个消息
if current_message is not None and current_content_lines:
content = '\n'.join(current_content_lines) # 合并内容行
messages.append({
"role": current_message["role"],
"content": content
})
# 9.开始新消息
result = self._parse_log_line(line) # 解析这一行
if result: # 如果解析成功
role, content = result
current_message = {"role": role} # 记录角色
current_content_lines = [content] if content else [] # 初始化内容
else: # 解析失败
current_message = None
current_content_lines = []
# 10.如果当前有活跃消息,且不是消息开始行,则作为内容行处理
elif current_message is not None:
# 跳过分隔线和空行
if line.strip() and not line.strip().startswith('---') and not line.strip().startswith('--'):
current_content_lines.append(line # 添加到当前消息内容
# 11.保存最后一个消息(循环结束后)
if current_message is not None and current_content_lines:
content = '\n'.join(current_content_lines)
messages.append({
"role": current_message["role"],
"content": content
})
return messages
def get_log_files_by_date(self, days: int = 3) -> List[str]:
"""
获取最近几天的日志文件路径
Args:
days: 要获取的天数
Returns:
List[str]: 日志文件路径列表,按日期倒序排列
"""
log_files = []
today = datetime.now()
for i in range(days):
date = today - timedelta(days=i)
date_str = date.strftime('%Y-%m-%d')
log_file = self.log_dir / f"{date_str}.log"
if log_file.exists():
log_files.append(str(log_file))
# 按日期倒序排列(最新的在前)
log_files.reverse()
return log_files
def load_recent_context(self, days: int = 3, max_messages: int = None) -> List[Dict]:
"""
加载最近几天的对话上下文
Args:
days: 要加载的天数
max_messages: 最大消息数量限制
Returns:
List[Dict]: 对话消息列表
"""
all_messages = []
log_files = self.get_log_files_by_date(days)
logger.info(f"开始加载最近 {days} 天的日志文件: {log_files}")
for log_file in log_files:
messages = self.parse_log_file(log_file)
all_messages.extend(messages)
logger.debug(f"从 {log_file} 加载了 {len(messages)} 条消息")
# 限制消息数量
if max_messages and len(all_messages) > max_messages:
all_messages = all_messages[-max_messages:] # 7. 只保留最近的消息
logger.info(f"限制消息数量为 {max_messages} 条")
logger.info(f"总共加载了 {len(all_messages)} 条历史对话")
return all_messages
def get_context_statistics(self, days: int = 7) -> Dict:
"""
获取上下文统计信息
Args:
days: 统计天数
Returns:
Dict: 统计信息
"""
log_files = self.get_log_files_by_date(days)
total_messages = 0 # 2. 初始化计数器
user_messages = 0
assistant_messages = 0
# 3. 遍历每个文件
for log_file in log_files:
messages = self.parse_log_file(log_file) # 4. 解析文件
total_messages += len(messages) # 5. 累加总消息数
# 6. 统计用户和AI消息数量
for msg in messages:
if msg["role"] == "user":
user_messages += 1
else:
assistant_messages += 1
return {
"total_files": len(log_files), # 文件数量
"total_messages": total_messages, # 总消息数
"user_messages": user_messages, # 用户消息数
"assistant_messages": assistant_messages, # AI消息数
"days_covered": days # 统计天数
}
def save_conversation_log(self, user_message: str, assistant_message: str, dev_mode: bool = False):
"""
保存对话日志到文件
Args:
user_message: 用户消息
assistant_message: 助手回复
dev_mode: 是否为开发者模式(开发者模式不保存日志)
"""
if dev_mode:
return # 开发者模式不写日志
try:
from datetime import datetime
import os
# 获取当前时间
now = datetime.now()
date_str = now.strftime('%Y-%m-%d')
time_str = now.strftime('%H:%M:%S')
# 确保日志目录存在
log_dir = str(self.log_dir)
if not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True) # 创建目录(如果不存在)
logger.info(f"已创建日志目录: {log_dir}")
# 保存对话日志
log_file = os.path.join(log_dir, f"{date_str}.log")
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"[{time_str}] 用户: {user_message}\n") # 用户消息
f.write(f"[{time_str}] {self.ai_name}: {assistant_message}\n") # AI回复
f.write("-" * 50 + "\n") # 分隔线
logger.debug(f"已保存对话日志到: {log_file}")
except Exception as e:
logger.error(f"保存对话日志失败: {e}")
def save_conversation_and_logs(self, session_id: str, user_message: str, assistant_response: str):
"""统一保存对话历史与日志 - 整合重复逻辑"""
try:
# 保存对话历史到消息管理器
self.add_message(session_id, "user", user_message)
self.add_message(session_id, "assistant", assistant_response)
# 保存对话日志到文件
self.save_conversation_log(
user_message,
assistant_response,
dev_mode=False # 开发者模式已禁用
)
except Exception as e:
logger.error(f"保存对话与日志失败: {e}")
def trigger_background_analysis(self, session_id: str):
"""统一触发后台意图分析 - 整合重复逻辑"""
try:
# 检查是否已经有分析在进行中
# 使用analysis_in_progress字典防止重复分析:
if self.analysis_in_progress.get(session_id): # 检查是否已在分析中
logger.info(f"[博弈论] 会话 {session_id} 已有意图分析在进行中,跳过重复触发")
return
# 标记分析开始
self.analysis_in_progress[session_id] = True
import asyncio
from system.background_analyzer import get_background_analyzer
from system.config import config
background_analyzer = get_background_analyzer()
# 根据配置获取意图分析轮数,默认3轮
intent_rounds = getattr(config.api, 'intent_analysis_rounds', 3)
max_messages = intent_rounds * 2 # 每轮包含用户和助手各一条消息
recent_messages = self.get_recent_messages(session_id, count=max_messages)
logger.info(f"[博弈论] 分析最近 {intent_rounds} 轮对话,共 {len(recent_messages)} 条消息")
# 异步执行分析任务
async def _execute_analysis():
try:
await background_analyzer.analyze_intent_async(recent_messages, session_id)
finally:
# 无论成功与否,都清除分析状态
self.analysis_in_progress[session_id] = False
logger.info(f"[博弈论] 会话 {session_id} 意图分析完成,状态已清除")
asyncio.create_task(_execute_analysis())
except Exception as e:
# 发生异常时也要清除分析状态
self.analysis_in_progress[session_id] = False
logger.error(f"后台意图分析触发失败: {e}")
def get_all_sessions_api(self):
"""获取所有会话信息 - API接口"""
try:
# 清理过期会话
self.cleanup_old_sessions()
# 获取所有会话信息
sessions_info = self.get_all_sessions_info()
return {
"status": "success", # 状态
"sessions": sessions_info, # 会话信息
"total_sessions": len(sessions_info) # 会话总数
}
except Exception as e:
logger.error(f"获取会话信息错误: {e}")
raise Exception(f"获取会话信息失败: {str(e)}")
def get_session_detail_api(self, session_id: str):
"""获取指定会话的详细信息 - API接口"""
try:
session_info = self.get_session_info(session_id)
if not session_info:
raise Exception("会话不存在")
return {
"status": "success",
"session_id": session_id,
"session_info": session_info,
"messages": self.get_messages(session_id),
"conversation_rounds": session_info["conversation_rounds"]
}
except Exception as e:
logger.error(f"获取会话详情错误: {e}")
raise Exception(f"获取会话详情失败: {str(e)}")
def delete_session_api(self, session_id: str):
"""删除指定会话 - API接口"""
try:
success = self.delete_session(session_id)
if success:
return {
"status": "success",
"message": f"会话 {session_id} 已删除"
}
else:
raise Exception("会话不存在")
except Exception as e:
logger.error(f"删除会话错误: {e}")
raise Exception(f"删除会话失败: {str(e)}")
def clear_all_sessions_api(self):
"""清空所有会话 - API接口"""
try:
count = self.clear_all_sessions()
return {
"status": "success",
"message": f"已清空 {count} 个会话"
}
except Exception as e:
logger.error(f"清空会话错误: {e}")
raise Exception(f"清空会话失败: {str(e)}")
# 全局消息管理器实例
message_manager = MessageManager()
# =====MessageManager功能地图======
'''
MessageManager
├── 初始化配置
├── 会话管理
│ ├── create_session() - 创建会话
│ ├── get_session() - 获取会话
│ ├── delete_session() - 删除会话
│ ├── clear_all_sessions() - 清空所有
│ └── cleanup_old_sessions() - 清理过期
├── 消息管理
│ ├── add_message() - 添加消息
│ ├── get_messages() - 获取所有消息
│ ├── get_recent_messages() - 获取最近消息
│ ├── build_conversation_messages() - 构建对话包
│ └── build_conversation_messages_from_memory() - 从内存构建
├── 日志管理
│ ├── parse_log_file() - 解析日志文件
│ ├── load_recent_context() - 加载历史上下文
│ ├── save_conversation_log() - 保存日志
│ └── save_conversation_and_logs() - 统一保存
├── 统计分析
│ ├── get_session_info() - 会话信息
│ ├── get_all_sessions_info() - 所有会话信息
│ ├── get_context_statistics() - 上下文统计
│ └── get_agent_type()/set_agent_type() - 代理类型
├── 后台处理
│ └── trigger_background_analysis() - 触发后台分析
├── API接口
│ ├── get_all_sessions_api() - 所有会话API
│ ├── get_session_detail_api() - 会话详情API
│ ├── delete_session_api() - 删除会话API
│ └── clear_all_sessions_api() - 清空所有API
└── 辅助方法(以_开头)
├── _parse_log_line() - 解析日志行
├── _is_message_start_line() - 判断消息开始
└── _parse_conversation_block() - 解析对话块
'''
# ========常见的编程范式/模式总结=========
# 1. 初始化-处理-清理模式
# 这个模式在代码中反复出现:
'''
def 某个函数():
# 1. 初始化阶段
结果容器 = [] # 或 {}
临时变量 = None
# 2. 处理阶段
for 每个项目 in 数据源:
# 处理逻辑
if 条件:
结果容器.append(处理结果)
# 3. 清理/返回阶段
return 结果容器
在代码中的体现:
parse_log_file():初始化messages列表,逐块处理,返回结果
load_recent_context():初始化all_messages,逐个文件处理,返回结果
_parse_conversation_block():初始化各种变量,逐行处理,返回结果
为什么这样设计?
清晰:三个阶段分明
可维护:每个阶段职责明确
可测试:可以单独测试每个阶段
'''
# 2. 标记-处理-清除模式(状态管理)
'''
def 有状态处理的函数():
# 1. 标记开始
标记字典[键] = True
try:
# 2. 核心处理
复杂操作()
finally:
# 3. 清除标记(无论如何都执行)
标记字典[键] = False
在代码中的体现:
def trigger_background_analysis(self, session_id: str):
# 标记开始
if self.analysis_in_progress.get(session_id):
return # 防止重复
self.analysis_in_progress[session_id] = True
try:
# 核心处理
await background_analyzer.analyze_intent_async(...)
finally:
# 清除标记
self.analysis_in_progress[session_id] = False
为什么这样设计?
防重入:防止同一任务被重复执行
资源安全:确保标记被正确清理
异常安全:即使出错也能清理状态
'''
# 3. 配置-默认值模式
'''
def 某个需要配置的函数():
try:
# 1. 尝试获取外部配置
from 配置模块 import 配置
参数 = 配置.某个值
except ImportError:
# 2. 失败时使用默认值
参数 = 默认值
在代码中的体现:
def __init__(self):
try:
from system.config import config
self.max_history_rounds = config.api.max_history_rounds
except ImportError:
self.max_history_rounds = 10 # 默认值
为什么这样设计?
灵活性:可以通过配置文件调整行为
鲁棒性:配置缺失时程序仍能运行
可部署性:不同环境可以用不同配置
'''
# 4. 包装器模式(API封装)
'''
def 原始函数(参数):
# 原始逻辑
return 结果
def 包装后的API函数(参数):
try:
# 调用原始函数
结果 = 原始函数(参数)
# 包装成标准格式
return {
"status": "success",
"data": 结果
}
except Exception as e:
# 统一错误处理
logger.error(...)
return {
"status": "error",
"message": str(e)
}
在代码中的体现:
def delete_session(self, session_id: str) -> bool:
# 原始逻辑
if session_id in self.sessions:
del self.sessions[session_id]
return True
return False
def delete_session_api(self, session_id: str):
# 包装后的API
try:
success = self.delete_session(session_id)
if success:
return {"status": "success", "message": f"会话已删除"}
else:
raise Exception("会话不存在")
except Exception as e:
logger.error(...)
raise Exception(...)
为什么这样设计?
接口统一:所有API返回相同结构
错误处理集中:统一记录日志和错误格式
业务逻辑分离:核心逻辑和接口逻辑分离
'''
# 5. 遍历-过滤-收集模式
'''
def 处理集合(数据集合):
结果 = []
for 项目 in 数据集合:
# 过滤条件
if not 满足条件(项目):
continue # 跳过
# 处理逻辑
处理后的项目 = 处理函数(项目)
# 收集结果
结果.append(处理后的项目)
return 结果
在代码中的体现:
def cleanup_old_sessions(self, max_age_hours: int = 24):
expired_sessions = [] # 收集结果
for session_id, session in self.sessions.items(): # 遍历
# 过滤条件
if current_time - session["last_activity"] > max_age_hours * 3600:
# 收集符合条件的
expired_sessions.append(session_id)
# 后续处理收集到的结果
for session_id in expired_sessions:
del self.sessions[session_id]
return len(expired_sessions)
'''
# 6. 状态机模式
'''
def 状态机处理函数(输入数据):
当前状态 = None
当前数据 = []
for 行 in 输入数据:
if 是状态开始标记(行):
# 保存之前的状态数据
if 当前状态 and 当前数据:
保存状态数据(当前状态, 当前数据)
# 进入新状态
当前状态 = 解析状态(行)
当前数据 = []
elif 当前状态 is not None:
# 在当前状态下收集数据
当前数据.append(行)
# 处理最后的状态
if 当前状态 and 当前数据:
保存状态数据(当前状态, 当前数据)
在代码中的体现:
def _parse_conversation_block(self, block: str):
current_message = None # 当前状态
current_content_lines = [] # 当前数据
for line in lines:
if self._is_message_start_line(line): # 状态切换
# 保存前一个状态
if current_message is not None and current_content_lines:
保存消息()
# 进入新状态
current_message = {"role": role}
current_content_lines = []
elif current_message is not None: # 状态内处理
current_content_lines.append(line)
# 处理最后一个状态
if current_message is not None and current_content_lines:
保存消息()
为什么这样设计?
处理复杂结构:适合处理有层次、有状态的数据
内存高效:一次遍历完成处理
逻辑清晰:状态转换明确
'''
# ========这些模式背后的编程思想=========
'''
1. 分离关注点(Separation of Concerns)
初始化、处理、清理各司其职
业务逻辑和错误处理分离
数据获取和数据使用分离
2. 防御式编程(Defensive Programming)
总是检查输入有效性
总是处理异常情况
总是清理资源
3. 一次做好一件事(Do One Thing Well)
每个函数只负责一个主要任务
复杂任务分解为简单步骤
通过组合小函数完成大功能
4. 不要重复自己(DRY - Don't Repeat Yourself)
相似的逻辑抽象成函数
通用的模式形成规范
配置集中管理
'''
# 如何在代码中识别这些模式?
'''
看变量名:
result = [] → 收集模式
current_xxx = None → 状态机模式
is_xxx_in_progress → 标记模式
看代码结构:
try-except包裹 → 异常安全模式
函数开头有配置读取 → 配置模式
返回标准字典结构 → API包装模式
看注释:
注释中提到的"状态"、"标记"、"收集"等关键词
'''
# 在其他代码中寻找模式
'''
# 写一个简单的遍历-过滤-收集模式
def get_even_numbers(numbers):
"""获取所有偶数"""
result = [] # 初始化
for num in numbers: # 遍历
if num % 2 == 0: # 过滤
result.append(num) # 收集
return result # 返回
# 写一个简单的配置-默认值模式
def get_timeout():
try:
import settings
return settings.TIMEOUT
except ImportError:
return 30 # 默认值
'''
api_server.py
#!/usr/bin/env python3
"""
NagaAgent API服务器
提供RESTful API接口访问NagaAgent功能
"""
import asyncio # 异步编程工具(让程序可以同时做多件事)
import json # JSON数据处理(像翻译器,把数据变成计算机能懂的样子)
import sys # 系统相关功能
import traceback # 错误追踪(程序出错时告诉我们是哪里错了)
import os # 操作系统功能(文件、文件夹操作)
import logging # 日志记录(就像写日记,记录程序做了什么)
import uuid # 生成唯一ID(给每样东西一个唯一的身份证号)
import time # 时间相关功能
import threading # 多线程(让程序可以分身做不同的事)
from contextlib import asynccontextmanager # 异步上下文管理(资源管理工具)
from typing import Dict, List, Optional, AsyncGenerator, Any # 类型提示(告诉计算机变量是什么类型)
# 在导入其他模块前先设置HTTP库日志级别,(让程序保持安静)
logging.getLogger("httpcore.http11").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore.connection").setLevel(logging.WARNING)
# 创建logger实例
logger = logging.getLogger(__name__)
# 从nagaagent_core工具包拿工具
from nagaagent_core.api import uvicorn # 网页服务器引擎
from nagaagent_core.api import FastAPI, HTTPException, Request, UploadFile, File, Form # 创建网站API的工具
from nagaagent_core.api import CORSMiddleware # 跨域资源共享(让不同网站能互相访问)
from nagaagent_core.api import StreamingResponse # 流式响应(像水管一样持续传输数据)
from nagaagent_core.api import StaticFiles # 静态文件服务(图片、CSS等)
from pydantic import BaseModel # 数据验证工具(检查数据是否符合要求)
from nagaagent_core.core import aiohttp # 异步HTTP客户端
import shutil # 文件操作工具
from pathlib import Path # 路径处理工具
# 添加项目根目录到Python路径(告诉程序去哪里找东西),就像在手机上添加一个新的收藏位置,以后找东西更方便。
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 流式文本处理模块(仅用于TTS)
from .message_manager import message_manager # 导入统一的消息管理器
from .llm_service import get_llm_service # 导入LLM服务
# 导入配置系统
try:
from system.config import config, AI_NAME # 使用新的配置系统
from system.config import get_prompt # 导入提示词仓库
except ImportError:
# 如果找不到,再调整一次路径然后导入
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from system.config import config, AI_NAME # 使用新的配置系统
from system.config import get_prompt # 导入提示词仓库
from ui.utils.response_util import extract_message # 导入消息提取工具
# 对话核心功能已集成到apiserver
#1. 两个“转发”函数
# 统一后台意图分析触发函数 - 已整合到message_manager
def _trigger_background_analysis(session_id: str):
"""统一触发后台意图分析 - 委托给message_manager"""
message_manager.trigger_background_analysis(session_id)
# 统一保存对话与日志函数 - 已整合到message_manager
def _save_conversation_and_logs(session_id: str, user_message: str, assistant_response: str):
"""统一保存对话历史与日志 - 委托给message_manager"""
message_manager.save_conversation_and_logs(session_id, user_message, assistant_response)
#这两个函数自己不干活,只是“传话员”。它们把工作交给message_manager去做。
# 回调工厂类已移除 - 功能已整合到streaming_tool_extractor
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
try:
print("[INFO] 正在初始化API服务器...")
# 程序启动时要做的事
# 对话核心功能已集成到apiserver
print("[SUCCESS] API服务器初始化完成")
yield
except Exception as e:
print(f"[ERROR] API服务器初始化失败: {e}")
traceback.print_exc()
sys.exit(1)
finally: #关店清理(finally部分)
print("[INFO] 正在清理资源...")
# MCP服务现在由mcpserver独立管理,无需清理
# 创建FastAPI应用
app = FastAPI(
title="NagaAgent API",
description="智能对话助手API服务",
version="4.0.0",
lifespan=lifespan # 使用上面定义的生命周期管理
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境建议限制具体域名
allow_credentials=True, # 允许携带凭证
allow_methods=["*"], # 允许所有HTTP方法
allow_headers=["*"], # 允许所有请求头
)
# 挂载静态文件
static_dir = os.path.join(os.path.dirname(__file__), "static")
app.mount("/static", StaticFiles(directory=static_dir), name="static")
# 请求模型
class ChatRequest(BaseModel):
message: str # 用户发送的消息
stream: bool = False # 是否使用流式传输(默认否)
session_id: Optional[str] = None # 会话ID(可选)
use_self_game: bool = False # 是否使用自博弈(游戏相关)
disable_tts: bool = False # V17: 支持禁用服务器端TTS(语音合成)
return_audio: bool = False # V19: 支持返回音频URL供客户端播放
skip_intent_analysis: bool = False # 新增:跳过意图分析
class ChatResponse(BaseModel):
response: str
session_id: Optional[str] = None
status: str = "success"
class SystemInfoResponse(BaseModel): #系统信息
version: str
status: str
available_services: List[str]
api_key_configured: bool
class FileUploadResponse(BaseModel): #文件上传响应
filename: str
file_path: str
file_size: int
file_type: str
upload_time: str
status: str = "success"
message: str = "文件上传成功"
class DocumentProcessRequest(BaseModel): #文档处理请求
file_path: str
action: str = "read" # read, analyze, summarize
session_id: Optional[str] = None
# API路由
@app.get("/", response_model=Dict[str, str]) #访问根路径时返回基本信息。
async def root():
"""API根路径"""
return {
"name": "NagaAgent API",
"version": "4.0.0",
"status": "running",
"docs": "/docs",
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"agent_ready": True,
"timestamp": str(asyncio.get_event_loop().time())
}
@app.get("/system/info", response_model=SystemInfoResponse)
async def get_system_info():
"""获取系统信息"""
return SystemInfoResponse(
version="4.0.0",
status="running",
available_services=[], # MCP服务现在由mcpserver独立管理
api_key_configured=bool(config.api.api_key and config.api.api_key != "sk-placeholder-key-not-set")
)
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""普通对话接口 - 仅处理纯文本对话"""
'''
1. 接口定义 - 就像开了一个聊天窗口
app.post("/chat"):创建一个POST请求的接口,地址是/chat
就像在网站上开了一个聊天窗口,用户可以通过这个窗口发送消息
response_model=ChatResponse:告诉程序返回的数据要按照ChatResponse的格式来
async def chat:这是一个异步函数,可以同时处理多个用户的聊天
'''
#安全检查 - 检查用户有没有发送空消息
if not request.message.strip(): #request.message.strip():获取用户发送的消息,并去掉两边的空格
#raise HTTPException:就抛出一个错误,告诉用户"消息内容不能为空"
raise HTTPException(status_code=400, detail="消息内容不能为空") #status_code=400:400是HTTP状态码,表示"错误的请求"
try:
# 分支: 启用博弈论流程(非流式,返回聚合文本)
if request.use_self_game: #if request.use_self_game::检查用户是否想使用"博弈论"模式
# 配置项控制:失败时可跳过回退到普通对话
# #从配置中读取两个设置:1.skip_on_error:博弈论失败时是否跳过(默认是)2.enabled:是否启用博弈论(默认否)
#getattr是Python函数,安全地获取对象的属性,如果属性不存在就返回默认值
skip_on_error = getattr(getattr(config, 'game', None), 'skip_on_error', True) # 兼容无配置情况 #
enabled = getattr(getattr(config, 'game', None), 'enabled', False) # 控制总开关 #
if enabled:
try:
# 延迟导入以避免启动时循环依赖 #
from game.naga_game_system import NagaGameSystem # 博弈系统入口 #
from game.core.models.config import GameConfig # 博弈系统配置 #
# 创建系统并执行用户问题处理 #
system = NagaGameSystem(GameConfig())
system_response = await system.process_user_question(
user_question=request.message,
user_id=request.session_id or "api_user"
)
return ChatResponse(
response=system_response.content,
session_id=request.session_id,
status="success"
)
except Exception as e:
print(f"[WARNING] 博弈论流程失败,将{ '回退到普通对话' if skip_on_error else '返回错误' }: {e}") # 运行时警告 #
if not skip_on_error:
raise HTTPException(status_code=500, detail=f"博弈论流程失败: {str(e)}")
# 否则继续走普通对话流程 #
# 若未启用或被配置跳过,则直接回退到普通对话分支 #
# 获取或创建会话ID
session_id = message_manager.create_session(request.session_id)
# 构建系统提示词(只使用对话风格提示词)
#get_prompt("conversation_style_prompt"):从提示词仓库获取"对话风格提示词"
system_prompt = get_prompt("conversation_style_prompt")
# 使用消息管理器构建完整的对话消息(纯聊天,不触发工具)
messages = message_manager.build_conversation_messages(
session_id=session_id,
system_prompt=system_prompt,
current_message=request.message
)
# 使用整合后的LLM服务
llm_service = get_llm_service()
response_text = await llm_service.chat_with_context(messages, config.api.temperature)
# 处理完成
# 统一保存对话历史与日志
_save_conversation_and_logs(session_id, request.message, response_text)
#调用之前定义的_save_conversation_and_logs函数
#保存用户消息和AI回复到对话历史中
#就像把这次聊天记录到聊天记录本里
# 在用户消息保存到历史后触发后台意图分析(除非明确跳过)
#这是后台悄悄进行的,不会影响当前回复
if not request.skip_intent_analysis:
_trigger_background_analysis(session_id=session_id)
return ChatResponse( #返回响应给用户
response=extract_message(response_text) if response_text else response_text,
session_id=session_id,
status="success"
)
'''
理解:
把AI的回复包装成ChatResponse格式返回
extract_message(response_text):可能对回复做最后的处理(比如提取关键部分)
包含会话ID(这样用户下次可以继续这个对话)
状态设置为"success"(成功)
'''
#错误处理 - 如果一切出错了怎么办
except Exception as e:
print(f"对话处理错误: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
'''
如果上面的代码有任何错误,就执行这里
print(f"对话处理错误: {e}"):在控制台打印错误信息
traceback.print_exc():打印详细的错误堆栈(知道是哪一行出错了)
raise HTTPException:告诉用户"处理失败",并显示具体错误
'''
# =========流程图总结这个函数的工作流程:============
'''
开始
↓
检查消息是否为空 → 如果是 → 返回错误
↓
检查是否使用博弈论模式 → 如果是 → 尝试博弈论处理
↓ ↓
↓ 成功? → 是 → 返回博弈论结果
↓ ↓
↓ 否 → 配置跳过? → 否 → 返回错误
↓ ↓
↓ 是
↓ ↓
进入普通对话模式
↓
获取/创建会话ID
↓
获取对话风格提示词
↓
构建完整对话消息(历史+新消息)
↓
调用AI模型生成回复
↓
保存对话历史
↓
(可选)触发后台意图分析
↓
返回成功响应
↓
如果有任何错误 → 打印错误并返回错误信息
'''
# ======关键函数总结:==========
'''
1.消息管理器相关:
message_manager.create_session() - 创建会话
message_manager.build_conversation_messages() - 构建对话消息
2.配置相关:
get_prompt() - 获取提示词
config.api.temperature - 获取温度参数
3.AI服务相关:
get_llm_service() - 获取AI服务
llm_service.chat_with_context() - 与AI对话
4.工具函数:
extract_message() - 提取消息(可能来自另一个模块)
_save_conversation_and_logs() - 保存对话(调用消息管理器)
_trigger_background_analysis() - 触发意图分析(调用消息管理器)
'''
#流式聊天接口(/chat/stream),这段代码比之前的更复杂一些,因为它涉及到流式传输(像水管一样一点点输送数据)和语音处理。
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""流式对话接口 - 流式文本处理交给streaming_tool_extractor用于TTS"""
'''
理解:
创建一个新的POST接口,地址是/chat/stream
专门处理流式对话(文字一个字一个字地显示,而不是一次性显示完整回复)
还会处理语音合成(TTS - Text To Speech,文字转语音)
比喻:这就像两个水管:
普通聊天接口:一桶水一次性倒给你
流式聊天接口:水管一点点流水给你
'''
#2. 检查空消息,和之前一样,先检查用户是不是发了空消息。
if not request.message.strip():
raise HTTPException(status_code=400, detail="消息内容不能为空")
#3. 核心:创建流式响应生成器
'''
重要概念:
generate_response是一个生成器函数(AsyncGenerator)
它不会一次性返回所有数据,而是用yield一点点产生数据
complete_text:用来累积AI的所有回复文字,后面可能用来生成语音
'''
async def generate_response() -> AsyncGenerator[str, None]:
complete_text = "" # V19: 用于累积完整文本以生成音频
try:
# 获取或创建会话ID,主要逻辑开始
session_id = message_manager.create_session(request.session_id)
# 发送会话ID信息
yield f"data: session_id: {session_id}\n\n"
# 注意:这里不触发后台分析,将在对话保存后触发
'''
理解:
创建会话ID(和之前一样)
yield f"data: session_id: {session_id}\n\n":这是第一次发送数据给客户端
格式是data: session_id: xxx,这是一种特殊的SSE(Server-Sent Events)格式
比喻:就像你先告诉用户:"我们的聊天室编号是123,然后我们开始聊天"
'''
#构建对话消息,和之前的普通聊天接口一样,准备对话内容。
# 构建系统提示词(只使用对话风格提示词)
system_prompt = get_prompt("conversation_style_prompt")
# 使用消息管理器构建完整的对话消息
messages = message_manager.build_conversation_messages(
session_id=session_id,
system_prompt=system_prompt,
current_message=request.message
)
# 语音集成初始化(重要部分!),这部分比较复杂,因为有很多模式和配置:
'''
理解4个条件(都要满足才启用实时TTS):
系统配置启用了语音功能
用户没有要求返回音频URL(return_audio=False)
语音模式不是"混合模式"
用户没有明确禁用TTS
'''
# 初始化语音集成(根据voice_mode和return_audio决定)
# V19: 如果客户端请求返回音频,则在服务器端生成
voice_integration = None
# V19: 混合模式下,如果请求return_audio,则在服务器生成音频
# 修复双音频问题:return_audio时不启用实时TTS,只在最后生成完整音频
should_enable_tts = (
config.system.voice_enabled
and not request.return_audio # 修复:return_audio时不启用实时TTS
and config.voice_realtime.voice_mode != "hybrid"
and not request.disable_tts # 兼容旧版本的disable_tts
)
if should_enable_tts:
try:
from voice.output.voice_integration import get_voice_integration
voice_integration = get_voice_integration()
logger.info(f"[API Server] 实时语音集成已启用 (return_audio={request.return_audio}, voice_mode={config.voice_realtime.voice_mode})")
except Exception as e:
print(f"语音集成初始化失败: {e}")
else: #如果条件满足,就初始化语音集成(让AI说话的功能)
if request.return_audio:
logger.info("[API Server] return_audio模式,将在最后生成完整音频")
elif config.voice_realtime.voice_mode == "hybrid" and not request.return_audio:
logger.info("[API Server] 混合模式下且未请求音频,不处理TTS")
elif request.disable_tts:
logger.info("[API Server] 客户端禁用了TTS (disable_tts=True)")
'''
理解不同的情况:
return_audio=True:用户想要音频文件,最后生成
混合模式:可能客户端自己处理语音
disable_tts=True:用户明确不要语音
'''
# 初始化流式文本切割器(仅用于TTS处理)
# 始终创建tool_extractor以累积文本内容,确保日志保存
tool_extractor = None
try:
from .streaming_tool_extractor import StreamingToolCallExtractor
tool_extractor = StreamingToolCallExtractor()
# 只有在需要实时TTS且不是return_audio模式时,才设置voice_integration
if voice_integration and not request.return_audio:
tool_extractor.set_callbacks(
on_text_chunk=None, # 不需要回调,直接处理TTS
voice_integration=voice_integration
)
except Exception as e:
print(f"流式文本切割器初始化失败: {e}")
'''
理解:
tool_extractor:文本切割器,把连续的文本切成小块
如果需要实时TTS,就把语音集成设置给切割器
这样切割器收到文字时,可以立即让语音集成说出来
'''
# 使用整合后的流式处理,流式调用AI模型(重点!)
llm_service = get_llm_service()\
#llm_service.stream_chat_with_context:这是流式调用AI的方法
#async for chunk in ...:一点点获取AI的回复,每次一小块(chunk)
async for chunk in llm_service.stream_chat_with_context(messages, config.api.temperature):
# V19: 如果需要返回音频,累积文本
if request.return_audio and chunk.startswith("data: "):
try:
import base64
data_str = chunk[6:].strip()
if data_str != '[DONE]':
decoded = base64.b64decode(data_str).decode('utf-8')
complete_text += decoded
except Exception:
pass
'''
理解:
如果用户要求返回音频,就累积完整的文本
chunk格式是data: BASE64编码的文字
需要解码BASE64,然后拼接到complete_text中
'''
# 立即发送到流式文本切割器进行TTS处理(不阻塞文本流)
if tool_extractor and chunk.startswith("data: "):
try:
import base64
data_str = chunk[6:].strip()
if data_str != '[DONE]':
decoded = base64.b64decode(data_str).decode('utf-8')
# 同步处理文本累积,不阻塞文本流
tool_extractor.complete_text += decoded
except Exception as e:
logger.error(f"[API Server] 流式文本切割器处理错误: {e}")
#同时,把文字也传给文本切割器(用于实时TTS),注意:这是并行处理的,不影响文字流给用户
yield chunk
#最重要的部分:立即把这一小块文字发送给用户!,比喻:AI思考一个字 → 发送给用户显示 → 同时传给语音系统准备说话,就像一个人一边说话(文字显示),一边发声(语音合成)
# 处理完成
#音频生成(如果用户要求音频文件)
# V19: 如果请求返回音频,在这里生成并返回音频URL
if request.return_audio and complete_text:
try:
logger.info(f"[API Server V19] 生成音频,文本长度: {len(complete_text)}")
# 使用服务器端的TTS生成音频
from voice.tts_wrapper import generate_speech_safe
import tempfile
import uuid
# 生成音频文件
tts_voice = config.voice_realtime.tts_voice or "zh-CN-XiaoyiNeural"
audio_file = generate_speech_safe(
text=complete_text,
voice=tts_voice,
response_format="mp3",
speed=1.0
)
'''
理解:
文字全部接收完后,用TTS生成整个音频文件
generate_speech_safe:安全生成语音的函数
选择语音(默认是"晓易"中文语音)
'''
# 直接使用voice/output播放音频,不再返回给客户端
try:
from voice.output.voice_integration import get_voice_integration
voice_integration = get_voice_integration()
voice_integration.receive_audio_url(audio_file)
logger.info(f"[API Server V19] 音频已直接播放: {audio_file}")
except Exception as e:
logger.error(f"[API Server V19] 音频播放失败: {e}")
# 如果播放失败,仍然返回给客户端作为备选
yield f"data: audio_url: {audio_file}\n\n"
except Exception as e:
logger.error(f"[API Server V19] 音频生成失败: {e}")
# traceback已经在文件顶部导入,直接使用
print(f"[API Server V19] 详细错误信息:")
traceback.print_exc()
#完成处理
# 完成流式文本切割器处理(非return_audio模式,不阻塞)
if tool_extractor and not request.return_audio:
try:
# 同步处理完成,不阻塞文本流返回
# tool_extractor.finish_processing() 是异步方法,这里不需要调用
pass
except Exception as e:
print(f"流式文本切割器完成处理错误: {e}")
# 完成语音处理
if voice_integration and not request.return_audio: # V19: return_audio模式不需要这里的处理
try:
threading.Thread(
target=voice_integration.finish_processing,
daemon=True
).start()
except Exception as e:
print(f"语音集成完成处理错误: {e}")
#理解:告诉文本切割器和语音集成:"处理完了,可以做收尾工作了",threading.Thread(...).start():在新线程中处理,不阻塞主程序
# 流式处理完成后,获取完整文本用于保存
complete_response = ""
if tool_extractor:
try:
# 获取完整文本内容
complete_response = tool_extractor.get_complete_text()
except Exception as e:
print(f"获取完整响应文本失败: {e}")
elif request.return_audio:
# V19: 如果是return_audio模式,使用累积的文本
complete_response = complete_text
# 统一保存对话历史与日志
_save_conversation_and_logs(session_id, request.message, complete_response)
# 在用户消息保存到历史后触发后台意图分析(除非明确跳过)
if not request.skip_intent_analysis:
_trigger_background_analysis(session_id)
yield "data: [DONE]\n\n"
'''
理解:获取完整的AI回复文字,保存到对话历史,触发意图分析,发送结束标记[DONE],告诉客户端:"我说完了"
'''
# 错误处理
except Exception as e:
print(f"流式对话处理错误: {e}")
# 使用顶部导入的traceback
traceback.print_exc()
yield f"data: 错误: {str(e)}\n\n"
#返回流式响应
return StreamingResponse(
generate_response(),
media_type="text/event-stream", #这是服务器推送事件的格式
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "*",
"X-Accel-Buffering": "no" # 禁用nginx缓冲,告诉nginx不要缓存,立即传输
}
)
# =========流程图总结:=============
'''
开始
↓
检查空消息
↓
创建会话ID → 立即发送给客户端
↓
判断语音模式:
- 实时TTS模式?
- 返回音频模式?
- 混合模式?
- 禁用TTS?
↓
初始化文本切割器
↓
开始流式获取AI回复:
↓
AI产生一个字 → 立即发送给客户端显示
↓ ↓
如果需要音频 → 累积文字 如果需要实时TTS → 传给语音系统
↓ ↓
继续下一个字 ←---------------
↓
AI回复完成
↓
如果是音频模式 → 生成音频文件 → 播放或返回URL
↓
完成文本切割器和语音处理
↓
保存对话历史
↓
触发意图分析
↓
发送结束标记
↓
如果有错误 → 发送错误信息
'''
# ==========关键函数和调用:=========
'''
1.消息处理:
message_manager.create_session() - 创建会话
message_manager.build_conversation_messages() - 构建消息
2.AI服务:
get_llm_service() - 获取AI服务
llm_service.stream_chat_with_context() - 流式聊天核心
3.语音处理:
get_voice_integration() - 获取语音集成
generate_speech_safe() - 生成语音文件
voice_integration.receive_audio_url() - 播放音频
4.文本处理:
StreamingToolCallExtractor() - 文本切割器
tool_extractor.set_callbacks() - 设置回调
5.数据保存:
_save_conversation_and_logs() - 保存对话
_trigger_background_analysis() - 触发分析
'''
# 获取记忆统计接口 (/memory/stats),@app.get("/memory/stats"):创建一个GET请求的接口,地址是/memory/stats,用来获取AI的"记忆系统"的统计信息
@app.get("/memory/stats")
async def get_memory_stats():
"""获取记忆统计信息"""
try:
# 记忆系统现在由main.py直接管理
try:
#第一层:尝试导入记忆管理器
# 分层理解:第一层:尝试导入记忆管理器:from summer_memory.memory_manager import memory_manager
# 尝试从summer_memory模块导入memory_manager,如果导入失败,说明记忆系统模块不存在
from summer_memory.memory_manager import memory_manager
#第二层:检查记忆系统是否启用
#如果记忆管理器存在且启用(enabled=True),调用get_memory_stats()方法获取统计信息,返回成功状态和统计信息
if memory_manager and memory_manager.enabled:
stats = memory_manager.get_memory_stats()
return {
"status": "success",
"memory_stats": stats
}
else:
return {
"status": "success",
"memory_stats": {"enabled": False, "message": "记忆系统未启用"}
}
#第三层:如果模块不存在,如果summer_memory模块不存在(ImportError),返回模块未找到的提示
except ImportError:
return {
"status": "success",
"memory_stats": {"enabled": False, "message": "记忆系统模块未找到"}
}
#处理其他所有错误
except Exception as e:
print(f"获取记忆统计错误: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"获取记忆统计失败: {str(e)}")
#这个接口就像问AI:"你还记得多少事情?",如果AI有记忆系统:返回记忆统计,如果记忆系统没开:告诉用户没开,如果记忆系统根本没装:告诉用户没安装
# 获取所有会话信息 (/sessions)
'''
理解:
GET请求,获取所有聊天会话的信息
完全委托给message_manager.get_all_sessions_api()处理
如果出错,返回500错误
'''
@app.get("/sessions")
async def get_sessions():
"""获取所有会话信息 - 委托给message_manager"""
try:
return message_manager.get_all_sessions_api()
except Exception as e:
print(f"获取会话信息错误: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
#{session_id}:这是路径参数,可以从URL中获取,例如:访问/sessions/abc123,session_id就是"abc123"
@app.get("/sessions/{session_id}")
async def get_session_detail(session_id: str):
"""获取指定会话的详细信息 - 委托给message_manager"""
try:
return message_manager.get_session_detail_api(session_id)
except Exception as e:
if "会话不存在" in str(e): #检查错误信息是否包含"会话不存在"
raise HTTPException(status_code=404, detail=str(e))
print(f"获取会话详情错误: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
#@app.delete:创建一个DELETE请求的接口
@app.delete("/sessions/{session_id}")
async def delete_session(session_id: str):
"""删除指定会话 - 委托给message_manager"""
try:
return message_manager.delete_session_api(session_id)
except Exception as e:
if "会话不存在" in str(e):
raise HTTPException(status_code=404, detail=str(e))
print(f"删除会话错误: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
#DELETE请求,但没有路径参数,删除所有会话(危险操作!),注意:这个接口没有404错误的处理,因为清空所有会话不会出现"会话不存在"的情况。
@app.delete("/sessions")
async def clear_all_sessions():
"""清空所有会话 - 委托给message_manager"""
try:
return message_manager.clear_all_sessions_api()
except Exception as e:
print(f"清空会话错误: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
# 文档处理功能已整合到 ui/controller/tool_document.py
# 新增:日志解析相关API接口
@app.get("/logs/context/statistics")
async def get_log_context_statistics(days: int = 7):
"""获取日志上下文统计信息"""
try:
statistics = message_manager.get_context_statistics(days)
return {
"status": "success",
"statistics": statistics
}
except Exception as e:
print(f"获取日志上下文统计错误: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"获取统计信息失败: {str(e)}")
@app.get("/logs/context/load")
async def load_log_context(days: int = 3, max_messages: int = None):
"""加载日志上下文"""
try:
messages = message_manager.load_recent_context(days=days, max_messages=max_messages)
return {
"status": "success",
"messages": messages,
"count": len(messages),
"days": days
}
except Exception as e:
print(f"加载日志上下文错误: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"加载上下文失败: {str(e)}")
@app.post("/tool_notification")
async def tool_notification(payload: Dict[str, Any]):
"""接收工具调用状态通知,只显示工具调用状态,不显示结果"""
try:
session_id = payload.get("session_id")
tool_calls = payload.get("tool_calls", [])
message = payload.get("message", "")
if not session_id:
raise HTTPException(400, "缺少session_id")
# 记录工具调用状态(不处理结果,结果由tool_result_callback处理)
for tool_call in tool_calls:
tool_name = tool_call.get("tool_name", "未知工具")
service_name = tool_call.get("service_name", "未知服务")
status = tool_call.get("status", "starting")
logger.info(f"工具调用状态: {tool_name} ({service_name}) - {status}")
# 这里可以添加WebSocket通知UI的逻辑,让UI显示工具调用状态
# 目前先记录日志,UI可以通过其他方式获取工具调用状态
return {
"success": True,
"message": "工具调用状态通知已接收",
"tool_calls": tool_calls,
"display_message": message
}
except Exception as e:
logger.error(f"工具调用通知处理失败: {e}")
raise HTTPException(500, f"处理失败: {str(e)}")
#非常重要
@app.post("/tool_result_callback")
async def tool_result_callback(payload: Dict[str, Any]):
"""接收MCP工具执行结果回调,让主AI基于原始对话和工具结果重新生成回复"""
try:
session_id = payload.get("session_id")
task_id = payload.get("task_id")
result = payload.get("result", {})
success = payload.get("success", False)
if not session_id:
raise HTTPException(400, "缺少session_id")
logger.info(f"[工具回调] 开始处理工具回调,会话: {session_id}, 任务ID: {task_id}")
logger.info(f"[工具回调] 回调内容: {result}")
# 获取工具执行结果
tool_result = result.get('result', '执行成功') if success else result.get('error', '未知错误')
logger.info(f"[工具回调] 工具执行结果: {tool_result}")
# 获取原始对话的最后一条用户消息(触发工具调用的消息)
session_messages = message_manager.get_messages(session_id)
original_user_message = ""
for msg in reversed(session_messages):
if msg.get('role') == 'user':
original_user_message = msg.get('content', '')
break
# 构建包含工具结果的用户消息
enhanced_message = f"{original_user_message}\n\n[工具执行结果]: {tool_result}"
logger.info(f"[工具回调] 构建增强消息: {enhanced_message[:200]}...")
# 构建对话风格提示词和消息
system_prompt = get_prompt("conversation_style_prompt")
messages = message_manager.build_conversation_messages(
session_id=session_id,
system_prompt=system_prompt,
current_message=enhanced_message
)
logger.info(f"[工具回调] 开始生成工具后回复...")
# 使用LLM服务基于原始对话和工具结果重新生成回复
try:
llm_service = get_llm_service()
response_text = await llm_service.chat_with_context(messages, temperature=0.7)
logger.info(f"[工具回调] 工具后回复生成成功,内容: {response_text[:200]}...")
except Exception as e:
logger.error(f"[工具回调] 调用LLM服务失败: {e}")
response_text = f"处理工具结果时出错: {str(e)}"
# 只保存AI回复到历史记录(用户消息已在正常对话流程中保存)
message_manager.add_message(session_id, "assistant", response_text)
logger.info(f"[工具回调] AI回复已保存到历史")
# 保存对话日志到文件
message_manager.save_conversation_log(original_user_message, response_text, dev_mode=False)
logger.info(f"[工具回调] 对话日志已保存")
# 通过UI通知接口将AI回复发送给UI
logger.info(f"[工具回调] 开始发送AI回复到UI...")
await _notify_ui_refresh(session_id, response_text)
logger.info(f"[工具回调] 工具结果处理完成,回复已发送到UI")
return {
"success": True,
"message": "工具结果已通过主AI处理并返回给UI",
"response": response_text,
"task_id": task_id,
"session_id": session_id
}
except Exception as e:
logger.error(f"[工具回调] 工具结果回调处理失败: {e}")
raise HTTPException(500, f"处理失败: {str(e)}")
#完整流程总结:
'''
用户提问 → AI发现需要工具 → 调用工具 → 工具执行
↓
工具完成 → 调用/tool_result_callback → AI重新生成回复 → 显示给用户
'''
@app.post("/tool_result")
async def tool_result(payload: Dict[str, Any]):
"""接收工具执行结果并显示在UI上"""
try:
session_id = payload.get("session_id")
result = payload.get("result", "")
notification_type = payload.get("type", "")
ai_response = payload.get("ai_response", "")
if not session_id:
raise HTTPException(400, "缺少session_id")
logger.info(f"工具执行结果: {result}")
# 如果是工具完成后的AI回复,通过信号机制通知UI线程显示
if notification_type == "tool_completed_with_ai_response" and ai_response:
try:
# 使用Qt信号机制在主线程中安全地更新UI
from ui.controller.tool_chat import chat
# 直接发射信号,确保在主线程中执行
chat.tool_ai_response_received.emit(ai_response)
logger.info(f"[UI] 已通过信号机制通知UI显示AI回复,长度: {len(ai_response)}")
except Exception as e:
logger.error(f"[UI] 调用UI控制器显示AI回复失败: {e}")
'''
重要概念:Qt信号机制:
Qt是一个图形界面框架
emit()是发送信号的函数
这是在多线程环境下安全更新UI的方式
比喻:就像在两个房间之间用对讲机通话:
后台线程:"我这里有AI回复了"
前台UI线程:"收到,我马上显示"
'''
return {
"success": True,
"message": "工具结果已接收",
"result": result,
"session_id": session_id
}
except Exception as e:
logger.error(f"处理工具结果失败: {e}")
raise HTTPException(500, f"处理失败: {str(e)}")
@app.post("/save_tool_conversation")
async def save_tool_conversation(payload: Dict[str, Any]):
"""保存工具对话历史"""
try:
session_id = payload.get("session_id")
user_message = payload.get("user_message", "")
assistant_response = payload.get("assistant_response", "")
if not session_id:
raise HTTPException(400, "缺少session_id")
logger.info(f"[保存工具对话] 开始保存工具对话历史,会话: {session_id}")
# 保存用户消息(工具执行结果)
if user_message:
message_manager.add_message(session_id, "user", user_message)
# 保存AI回复
if assistant_response:
message_manager.add_message(session_id, "assistant", assistant_response)
logger.info(f"[保存工具对话] 工具对话历史已保存,会话: {session_id}")
return {
"success": True,
"message": "工具对话历史已保存",
"session_id": session_id
}
except Exception as e:
logger.error(f"[保存工具对话] 保存工具对话历史失败: {e}")
raise HTTPException(500, f"保存失败: {str(e)}")
#这是一个通用的UI控制接口。
@app.post("/ui_notification")
async def ui_notification(payload: Dict[str, Any]):
"""UI通知接口 - 用于直接控制UI显示"""
try:
session_id = payload.get("session_id")
action = payload.get("action", "")
ai_response = payload.get("ai_response", "")
if not session_id:
raise HTTPException(400, "缺少session_id")
logger.info(f"UI通知: {action}, 会话: {session_id}")
# 处理显示工具AI回复的动作
if action == "show_tool_ai_response" and ai_response:
try:
from ui.controller.tool_chat import chat
# 直接发射信号,确保在主线程中执行
chat.tool_ai_response_received.emit(ai_response)
logger.info(f"[UI通知] 已通过信号机制显示工具AI回复,长度: {len(ai_response)}")
return {
"success": True,
"message": "AI回复已显示"
}
except Exception as e:
logger.error(f"[UI通知] 显示工具AI回复失败: {e}")
raise HTTPException(500, f"显示AI回复失败: {str(e)}")
return {
"success": True,
"message": "UI通知已处理"
}
except Exception as e:
logger.error(f"处理UI通知失败: {e}")
raise HTTPException(500, f"处理失败: {str(e)}")
'''
与/tool_result的区别:
/tool_result:特定用于工具结果
/ui_notification:通用的UI控制,可以做更多事情
'''
# ========工具调用流程总结:=========
#这里有两个不同的工具调用处理流程:
'''
流程一:完整回调流程(推荐)
1. 用户提问
2. AI决定调用工具
3. 调用工具 → 工具执行
4. 工具完成 → 调用/tool_result_callback
5. AI重新生成包含结果的回复
6. 调用/_notify_ui_refresh(或UI通知)显示回复
流程二:简单结果流程
1. 用户提问
2. AI决定调用工具
3. 调用工具 → 工具执行
4. 工具完成 → 调用/tool_result
5. 直接显示结果(不经过AI重新组织)
'''
# =======关键函数和方法:=======
'''
日志相关:
message_manager.get_context_statistics() - 获取统计
message_manager.load_recent_context() - 加载历史
工具调用相关:
message_manager.get_messages() - 获取会话消息
message_manager.add_message() - 添加消息到历史
message_manager.save_conversation_log() - 保存日志
AI相关:
get_llm_service() - 获取AI服务
llm_service.chat_with_context() - AI聊天
UI相关:
chat.tool_ai_response_received.emit() - Qt信号发送
'''
# ==========错误处理模式:==========
#注意这些接口都使用了相同的错误处理模式:
'''
try:
# 主要逻辑
return {"success": True, ...}
except Exception as e:
logger.error(f"错误信息: {e}")
raise HTTPException(status_code=500, detail=str(e))
'''
#整体架构理解:
#这部分的代码展示了AI系统的模块化设计:
'''
用户界面 (UI)
↓
API接口层 (这些接口)
↓
业务逻辑层 (message_manager, llm_service)
↓
工具层 (各种工具:天气、搜索等)
↓
数据层 (日志、历史记录)
'''
#这段代码定义了三个异步函数,它们都是内部使用的工具函数(以_开头表示私有)。这些函数的作用是把AI的回复发送给前端的用户界面(UI)。
#内部辅助函数(与UI通信)
async def _trigger_chat_stream_no_intent(session_id: str, response_text: str):
"""触发聊天流式响应但不触发意图分析 - 发送纯粹的AI回复到UI"""
try:
logger.info(f"[UI发送] 开始发送AI回复到UI,会话: {session_id}")
logger.info(f"[UI发送] 发送内容: {response_text[:200]}...")
# 直接调用现有的流式对话接口,但跳过意图分析
import httpx
# 构建请求数据 - 使用纯粹的AI回复内容,并跳过意图分析
chat_request = {
"message": response_text, # 直接使用AI回复内容,不加标记
"stream": True,
"session_id": session_id,
"use_self_game": False,
"disable_tts": False,
"return_audio": False,
"skip_intent_analysis": True # 关键:跳过意图分析
}
# 调用现有的流式对话接口
from system.config import get_server_port
api_url = f"http://localhost:{get_server_port('api_server')}/chat/stream"
async with httpx.AsyncClient() as client:
async with client.stream("POST", api_url, json=chat_request) as response: #json=chat_request:把请求数据转为JSON格式
if response.status_code == 200:
# 处理流式响应,包括TTS切割
async for chunk in response.aiter_text():
if chunk.strip():
# 这里可以进一步处理流式响应
# 或者直接让UI处理流式响应
pass
logger.info(f"[UI发送] AI回复已成功发送到UI: {session_id}")
logger.info(f"[UI发送] 成功显示到UI")
else:
logger.error(f"[UI发送] 调用流式对话接口失败: {response.status_code}")
except Exception as e:
logger.error(f"[UI发送] 触发聊天流式响应失败: {e}")
#这个函数比第一个更直接,它直接调用UI通知接口。
async def _notify_ui_refresh(session_id: str, response_text: str):
"""通知UI刷新会话历史"""
try:
import httpx
# 通过UI通知接口直接显示AI回复
ui_notification_payload = {
"session_id": session_id,
"action": "show_tool_ai_response",
"ai_response": response_text
}
from system.config import get_server_port
api_url = f"http://localhost:{get_server_port('api_server')}/ui_notification"
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(api_url, json=ui_notification_payload)
if response.status_code == 200:
logger.info(f"[UI通知] AI回复显示通知发送成功: {session_id}")
else:
logger.error(f"[UI通知] AI回复显示通知失败: {response.status_code}")
except Exception as e:
logger.error(f"[UI通知] 通知UI刷新失败: {e}")
#这是第三种方法,通过非流式聊天接口发送。
async def _send_ai_response_directly(session_id: str, response_text: str):
"""直接发送AI回复到UI"""
try:
import httpx
# 使用非流式接口发送AI回复
chat_request = {
"message": f"[工具结果] {response_text}", # 添加标记让UI知道这是工具结果
"stream": False,
"session_id": session_id,
"use_self_game": False,
"disable_tts": False,
"return_audio": False,
"skip_intent_analysis": True
}
from system.config import get_server_port
api_url = f"http://localhost:{get_server_port('api_server')}/chat"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(api_url, json=chat_request)
if response.status_code == 200:
logger.info(f"[直接发送] AI回复已通过非流式接口发送到UI: {session_id}")
else:
logger.error(f"[直接发送] 非流式接口发送失败: {response.status_code}")
except Exception as e:
logger.error(f"[直接发送] 直接发送AI回复失败: {e}")
# 工具执行结果已通过LLM总结并保存到对话历史中
# UI可以通过查询历史获取工具执行结果
# =======@符号饰器(Decorator)==========
#什么是装饰器?
#1. 最简单理解,装饰器就像一个"包装纸",它可以把一个函数"包装"起来,给这个函数增加一些额外的功能,但不改变函数本身的核心逻辑。
#生活比喻:假设你有一个普通的礼物(函数),用漂亮的包装纸包起来(装饰器),礼物本身没变,但现在更好看了(功能更强了)
#2. 代码示例理解
'''
没有装饰器的普通函数:
def say_hello():
print("你好!")
say_hello() # 输出:你好!
使用装饰器的函数:
def add_logging(func): # 装饰器函数
def wrapper():
print("开始记录日志...")
func()
print("日志记录完成。")
return wrapper
@add_logging # 使用装饰器
def say_hello():
print("你好!")
say_hello()
# 输出:
# 开始记录日志...
# 你好!
# 日志记录完成。
say_hello函数的核心功能(打印"你好!")没变,但通过@add_logging装饰器,给它增加了记录日志的功能。
'''
# =========FastAPI中的装饰器============
#在FastAPI框架中,装饰器有特定的用途:定义API路由。
#1. 基础用法
'''
@app.get("/") # 装饰器:把下面的函数变成一个处理GET请求的API
async def root():
return {"message": "Hello"}
理解:
@app.get("/"):这是一个装饰器
它告诉FastAPI:"当有人访问网站的根路径(/)时,就执行下面的root函数"
root函数返回的数据会自动转换成HTTP响应
'''
#2. 各种HTTP方法的装饰器
'''
@app.get("/users") # 获取数据
@app.post("/users") # 创建数据
@app.put("/users/{id}") # 更新数据
@app.delete("/users/{id}") # 删除数据
记忆方法:就像数据库的CRUD操作:
GET:读取(R)
POST:创建(C)
PUT:更新(U)
DELETE:删除(D)
'''
#这个代码中出现的所有装饰器类型
#让我们回顾一下代码中出现的各种装饰器:
#1. 路由装饰器(最多)
'''
@app.get("/") # 根路径
@app.get("/health") # 健康检查
@app.get("/system/info") # 系统信息
@app.post("/chat") # 普通聊天
@app.post("/chat/stream") # 流式聊天
@app.get("/memory/stats") # 记忆统计
@app.get("/sessions") # 获取所有会话
@app.get("/sessions/{session_id}") # 获取单个会话
@app.delete("/sessions/{session_id}") # 删除会话
@app.delete("/sessions") # 清空所有会话
@app.get("/logs/context/statistics") # 日志统计
@app.get("/logs/context/load") # 加载日志
@app.post("/tool_notification") # 工具通知
@app.post("/tool_result_callback") # 工具结果回调
@app.post("/tool_result") # 工具结果
@app.post("/save_tool_conversation") # 保存工具对话
@app.post("/ui_notification") # UI通知
'''
#2. 特殊的生命周期装饰器
'''
@asynccontextmanager # 这是一个特殊的装饰器
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
作用:这个装饰器把这个函数变成一个异步上下文管理器,用于管理应用的启动和关闭。
'''
#装饰器的工作原理(更深入一点)
#1. 装饰器实际上在做什么?
'''
当你写:
@app.get("/hello")
async def hello():
return "World"
实际上相当于:
def hello():
return "World"
hello = app.get("/hello")(hello) # 装饰器的底层实现
'''
# ======2. FastAPI装饰器的魔法 ===========
'''
FastAPI的装饰器做了很多事:
注册路由:把这个函数和URL路径关联起来
解析参数:自动解析请求参数、查询参数、路径参数
验证数据:验证请求数据的格式
生成文档:自动为这个接口生成API文档
处理响应:把返回值转换成HTTP响应
对比(如果没有装饰器):
# 没有装饰器的写法(伪代码)
def hello():
return "World"
# 需要手动注册路由
app.add_route(method="GET", path="/hello", handler=hello)
# 需要手动配置参数解析
# 需要手动配置响应处理
# ...很多其他代码
'''
# ========装饰器参数的解析========
'''
1. 路径参数
@app.get("/users/{user_id}") # {user_id} 是路径参数
async def get_user(user_id: str):
return {"user_id": user_id}
2. 查询参数
@app.get("/items")
async def read_items(skip: int = 0, limit: int = 10):
return {"skip": skip, "limit": limit}
访问:/items?skip=0&limit=10
'''
# 实际案例:分析代码中的一个装饰器
'''
让我们详细分析代码中的这个例子:
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
# ...函数体...
这个装饰器做了以下事情:
@app.post:这是一个POST请求装饰器
"/chat":路径是/chat
response_model=ChatResponse:
指定响应数据的格式
自动验证返回的数据是否符合ChatResponse的格式
自动生成API文档中的响应示例
'''
#装饰器的执行顺序
'''
当多个装饰器装饰一个函数时,它们从下往上执行:
@app.middleware("http") # 第二个执行
@app.get("/") # 第一个执行
async def root():
return {"message": "Hello"}
但实际上,在FastAPI中,你很少会看到多个装饰器叠加使用。
'''
# 总结装饰器在这个代码中的作用:
'''
装饰器 | 作用 | 类比
@app.get("/xxx") | 创建一个GET接口 | 开一个"只读"窗口
@app.delete("/xxx") | 创建一个DELETE接口 | 开一个"删除"窗口
@asynccontextmanager | 创建异步上下文管理器 | 设置"开门关门"流程
'''
#整体比喻:
'''
这个程序就像一个政府办事大厅
每个@app.get/@app.post装饰器就是开一个办事窗口
窗口上贴着标签(路径):/chat、/health、/sessions等
窗口后面坐着的办事员就是被装饰的函数
群众(客户端)来到对应窗口,办事员就处理他们的需求
'''
#常见问题
'''
Q:为什么有些装饰器有括号,有些没有?
A:@app.get这种有括号,是因为它本身是一个函数,需要传递参数(如路径)。@asynccontextmanager没有括号是因为它是一个装饰器工厂。
Q:装饰器一定要写在函数上面吗?
A:是的,装饰器必须紧挨着函数定义,写在函数定义的前一行。
Q:一个函数可以有多个装饰器吗?
A:可以,但不常见。如果有,执行顺序是从下往上。
'''
__init__.py
"""
NagaAgent API服务器模块
"""
from .api_server import app
__all__ = ['app']
'''
__init__.py 文件虽然代码短,但作用特别关键,是 Python 包(package)的核心标识和导出控制文
核心作用拆解
1.标记包身份
只要目录里有 __init__.py,Python 就会把这个目录识别成一个可导入的包,而不是普通文件夹。
没有它的话,你没法用 import apiserver 这种方式导入这个目录下的代码。
Python 3.3+ 之前,目录必须有 __init__.py 才被认为是包。现在虽然可以是隐式命名空间包,但显式创建仍然是最佳实践。
2.简化导入逻辑
代码里 from .api_server import app 是把 api_server.py 里的 app 对象导入到包的“根层级”。
原本你要写 from apiserver.api_server import app,现在可以直接写 from apiserver import app,更简洁。
没有 __init__.py 时:
# 用户需要知道内部结构
from mypackage.module1 import ClassA
from mypackage.module2 import function1
from mypackage.subpackage.module3 import helper
有 __init__.py 时:
# mypackage/__init__.py
from .module1 import ClassA
from .module2 import function1
from .subpackage.module3 import helper
_init_.py:# 用户使用更简洁
from mypackage import ClassA, function1, helper
3.控制导出范围(all)
__all__ = ['app'] 是定义这个包被from apiserver import * 导入时,对外暴露的内容只有 app。
避免把模块里的其他无关变量/函数也被批量导入,规范包的对外接口
当用户使用 from apiserver import * 时,只会导入 __all__ 中指定的内容
4. 暴露核心功能
实际案例演示:
项目结构:
mypackage/
├── __init__.py # 包的主入口
├── utils.py # 内部工具函数
├── core.py # 核心类
└── submodule/
├── __init__.py # 子包的入口
└── helper.py
内容:
# mypackage/utils.py
def internal_helper():
return "内部使用的工具函数"
def user_helper():
return "提供给用户的工具函数"
===================
# mypackage/core.py
class DataProcessor:
def process(self):
return "处理数据"
class Analyzer:
def analyze(self):
return "分析数据"
================
# mypackage/submodule/helper.py
class SpecialHelper:
def help(self):
return "特殊帮助"
现在配置 __init__.py:
# mypackage/__init__.py
from .core import DataProcessor, Analyzer
from .utils import user_helper
# 版本信息
__version__ = "1.0.0"
__author__ = "Your Name"
# 控制 from mypackage import * 的行为
__all__ = ['DataProcessor', 'Analyzer', 'user_helper']
# 可选:初始化代码
print(f"Initializing {__name__} version {__version__}")
===================
# mypackage/submodule/__init__.py
from .helper import SpecialHelper
用户使用体验:
# 用户代码 - 极其简洁
from mypackage import DataProcessor, user_helper
from mypackage.submodule import SpecialHelper
# 或者全部从主包导入
import mypackage
dp = mypackage.DataProcessor()
result = mypackage.user_helper()
5. 其他重要功能
版本管理
# __init__.py
__version__ = "2.1.0"
__version_info__ = (2, 1, 0)
包级别的配置
# __init__.py
import logging
# 设置包级别的日志
logger = logging.getLogger(__name__)
# 包配置
DEFAULT_CONFIG = {
'timeout': 30,
'retry': 3
}
延迟导入(Lazy Import)
# __init__.py
def __getattr__(name):
"""按需导入,减少启动时间"""
if name == "HeavyClass":
from .heavy_module import HeavyClass
return HeavyClass
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
6. 实际设计模式
模式一:API门面(Facade)
# __init__.py
# 将所有重要的类、函数导出到包级别
from .module1 import APIClient, Connection
from .module2 import process_data, validate_input
from .module3 import Result, Error
__all__ = [
'APIClient', 'Connection',
'process_data', 'validate_input',
'Result', 'Error'
]
模式二:子包重导出
# __init__.py
# 将子包的内容提升到主包级别
from . import subpackage1
from . import subpackage2
# 可以直接导入子包中的特定内容
from .subpackage1.core import CoreClass
from .subpackage2.helpers import helper_func
'''
#简单说,它就像这个 apiserver 包的“门面”:告诉 Python 这是个合法包,同时把核心的 app 暴露出来,让外部用起来更方便


