NagaAgent-main\apiserver
本文最后更新于61 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com

llm_service.py

#!/usr/bin/env python3
"""
LLM服务模块
提供统一的LLM调用接口,替代conversation_core.py中的get_response方法
"""

import logging
import sys
import os
from typing import Optional, Dict, Any, List
'''
类型提示(第8行):
Optional:表示这个变量可能有值,也可能是None(空)
Dict:字典类型(键值对)
Any:任意类型
List:列表类型
'''

# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from nagaagent_core.core import AsyncOpenAI
from nagaagent_core.api import FastAPI, HTTPException
from system.config import config
'''
自定义模块:
AsyncOpenAI:异步的OpenAI客户端(用来和AI对话)
FastAPI:创建Web API的工具
HTTPException:HTTP异常处理
config:配置文件,存储API密钥等设置
'''

# 配置日志
logger = logging.getLogger("LLMService")

class LLMService:
    """LLM服务类 - 提供统一的LLM调用接口"""
    
    def __init__(self):
        self.async_client: Optional[AsyncOpenAI] = None  #创建了一个变量 async_client,初始值是None
        self._initialize_client()  #调用 _initialize_client() 方法来初始化客户端
    
    def _initialize_client(self):
        """初始化OpenAI客户端"""
        try:
            self.async_client = AsyncOpenAI(
                api_key=config.api.api_key, 
                base_url=config.api.base_url.rstrip('/') + '/'
            )
            logger.info("LLM服务客户端初始化成功")
        except Exception as e:
            logger.error(f"LLM服务客户端初始化失败: {e}")
            self.async_client = None
            '''
            这个方法的逻辑:
               尝试创建OpenAI客户端
               如果成功:记录成功日志
               如果失败:记录错误日志,把客户端设为None
            '''
   
# ====== 4个主要的方法=======  
'''
方法名                      |  作用            | 特点
get_response               | 获取AI的回复      | 最简单的,只发一个问题
chat_with_context          | 带上下文的聊天     | 可以记住之前的对话
stream_chat_with_context   | 流式聊天          | 一个字一个字地返回,像打字一样
is_available               | 检查服务是否可用   | 返回True或False
'''   

    
    async def get_response(self, prompt: str, temperature: float = 0.7) -> str:
        """为其他模块提供API调用接口"""
        #这是一个双重检查:
        if not self.async_client:  # 如果客户端不存在
            self._initialize_client()  # 尝试初始化
            if not self.async_client: # 如果还是不存在
                return f"LLM服务不可用: 客户端初始化失败" # 返回错误信息
        
        try:
            response = await self.async_client.chat.completions.create(
                model=config.api.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=temperature,
                max_tokens=config.api.max_tokens
            )
            return response.choices[0].message.content
        except RuntimeError as e:
            if "handler is closed" in str(e):  # 如果是连接关闭的错误
                logger.debug(f"忽略连接关闭异常,重新创建客户端: {e}")
                # 重新创建客户端并重试
                self._initialize_client()
                if self.async_client:   # 如果重新创建成功
                    # 重新尝试请求
                    response = await self.async_client.chat.completions.create(
                        model=config.api.model,
                        messages=[{"role": "user", "content": prompt}],
                        temperature=temperature,
                        max_tokens=config.api.max_tokens
                    )
                    return response.choices[0].message.content
                else:   # 如果重新创建失败
                    return f"LLM服务不可用: 重连失败"
            else:
                logger.error(f"API调用失败: {e}")
                return f"API调用出错: {str(e)}"
        except Exception as e:
            logger.error(f"API调用失败: {e}")
            return f"API调用出错: {str(e)}"
    
    def is_available(self) -> bool:
        """检查LLM服务是否可用"""
        return self.async_client is not None
    
    async def chat_with_context(self, messages: List[Dict], temperature: float = 0.7) -> str:
        """带上下文的聊天调用"""
        if not self.async_client:
            self._initialize_client()
            if not self.async_client:
                return f"LLM服务不可用: 客户端初始化失败"
        
        try:
            response = await self.async_client.chat.completions.create(
                model=config.api.model,
                messages=messages,
                temperature=temperature,
                max_tokens=config.api.max_tokens
            )
            return response.choices[0].message.content
        except Exception as e:
            logger.error(f"上下文聊天调用失败: {e}")
            return f"聊天调用出错: {str(e)}"
    
    #stream_chat_with_context:方法名,意思是"流式聊天带上下文"
    #messages:参数1,聊天记录列表(包含之前的所有对话)
    #temperature:参数2,温度值(控制AI回答的随机性),默认0.7
    async def stream_chat_with_context(self, messages: List[Dict], temperature: float = 0.7):
        """带上下文的流式聊天调用"""
        #检查客户端是否可用
        if not self.async_client:  #如果客户端不存在
            self._initialize_client()  #尝试初始化客户端
            if not self.async_client:  #再次检查:如果初始化后还是不存在
                yield f"LLM服务不可用: 客户端初始化失败"  #返回错误信息
                return
        
        try:
            import aiohttp  #导入HTTP客户端库
            timeout = aiohttp.ClientTimeout(total=180, connect=60, sock_read=120)
            async with aiohttp.ClientSession(timeout=timeout) as session: #创建一个HTTP会话,async with 会自动管理资源,结束时自动关闭
                async with session.post(  #发送POST请求到AI服务,请求地址是:基础URL/chat/completions
                    f"{config.api.base_url}/chat/completions",
                    headers={                                              #headers={...}:请求头,包含:
                        "Authorization": f"Bearer {config.api.api_key}",   #认证信息(API密钥)
                        "Content-Type": "application/json",                #内容类型是JSON
                        "Accept": "text/event-stream",                     #接受事件流格式(流式传输)
                        "Connection": "keep-alive"                         #保持连接
                    },
                    json={                                                 #请求体,包含:
                        "model": config.api.model,                         #使用哪个AI模型
                        "messages": messages,                              #聊天记录
                        "temperature": temperature,                        #温度值
                        "max_tokens": config.api.max_tokens,               #最大令牌数
                        "stream": True                                     #开启流式传输
                    }
                ) as resp:
                    if resp.status != 200:  #获取HTTP状态码(200表示成功),如果不是200,返回错误信息并结束
                        yield f"LLM API调用失败 (状态码: {resp.status})"
                        return
                    
 # ======处理流式响应==========
'''
代码层级 |             代码行                                               |  作用        |   解释
第1层   |async for chunk in resp.content.iter_chunked(1024):              | 读取数据块    | 每次读取1024字节的数据
第2层   | if not chunk: break                                             | 检查数据块    | 如果数据块为空,停止循环
第3层   | try:                                                            | 尝试解码      | 开始尝试处理数据
第4层   | data = chunk.decode('utf-8')                                    | 解码数据      | 把字节数据转成字符串
第4层   | lines = data.split('\n')                                        | 分割成行      | 按换行符分割
第5层   | for line in lines:                                              | 遍历每一行    | 处理每一行数据
第6层   | line = line.strip()                                             | 去除空格      | 去掉开头结尾的空格
第7层   | if line.startswith('data: '):                                   | 检查数据行     | 只处理以"data: "开头的行
第8层   | data_str = line[6:]                                             | 提取数据      | 去掉前6个字符("data: ")
第9层   | if data_str == '[DONE]': return                                 | 检查结束标志   | 如果是"[DONE]",结束方法
第10层  | try:                                                            | 尝试解析JSON  | 开始解析JSON数据
第11层  | import json                                                     | 导入json模块  | 用于解析JSON
第12层  | data = json.loads(data_str)                                     | 解析JSON     | 把字符串转成JSON对象
第13层  | if 'choices' in data and len(data['choices']) > 0:              | 检查choices  | 确保有choices字段且不为空
第14层  | delta = data['choices'][0].get('delta', {})                     | 获取delta    | 获取第一个choice的delta部分
第15层  | if 'content' in delta:                                          | 检查是否有内容 | delta中是否有content字段
第16层  | import base64                                                   | 导入base64模块| 用于base64编码
第17层  | content = delta['content']                                      | 获取内容      | 获取AI返回的文本内容
第18层  | b64 = base64.b64encode(content.encode('utf-8')).decode('ascii') | base64编码   | 把内容转成base64格式
第19层  | yield f"data: {b64}\n\n"                                        | 返回数据      | 以特定格式返回编码后的内容
'''
                    async for chunk in resp.content.iter_chunked(1024):
                        if not chunk:
                            break
                        try:
                            data = chunk.decode('utf-8')
                            lines = data.split('\n')
                            for line in lines:
                                line = line.strip()
                                if line.startswith('data: '):
                                    data_str = line[6:]
                                    if data_str == '[DONE]':
                                        return
                                    try:
                                        import json
                                        data = json.loads(data_str)
                                        if 'choices' in data and len(data['choices']) > 0:
                                            delta = data['choices'][0].get('delta', {})
                                            if 'content' in delta:
                                                import base64
                                                content = delta['content']
                                                b64 = base64.b64encode(content.encode('utf-8')).decode('ascii')
                                                yield f"data: {b64}\n\n"
                                    #异常处理
                                    except json.JSONDecodeError:
                                        continue  #如果JSON解析失败,跳过这行继续
                        except UnicodeDecodeError:
                            continue   #如果字符串解码失败,跳过这个数据块继续
        #整体异常处理                    
        except Exception as e:   #如果上面所有代码出现任何错误
            logger.error(f"流式聊天调用失败: {e}")  #记录错误日志
            yield f"data: 流式调用出错: {str(e)}\n\n"  #返回错误信息

# 全局LLM服务实例
_llm_service: Optional[LLMService] = None

def get_llm_service() -> LLMService:
    """获取全局LLM服务实例"""
    global _llm_service
    if _llm_service is None:
        _llm_service = LLMService()
    return _llm_service
'''
这里用了一个设计模式叫“单例模式”:
整个程序只有一个LLMService实例
如果还没有创建,就创建一个
如果已经创建了,就直接用那个
'''

# ========Web API接口 ==========
# 创建独立的LLM服务API
llm_app = FastAPI(
    title="LLM Service API",
    description="LLM服务API",
    version="1.0.0"
)

@llm_app.post("/llm/chat")
async def llm_chat(request: Dict[str, Any]):
    """LLM聊天接口 - 为其他模块提供LLM调用服务"""
    try:
        prompt = request.get("prompt", "")
        temperature = request.get("temperature", 0.7)
        
        if not prompt:
            raise HTTPException(status_code=400, detail="prompt参数不能为空")
        
        llm_service = get_llm_service()
        response = await llm_service.get_response(prompt, temperature)
        
        return {
            "status": "success",
            "response": response,
            "temperature": temperature
        }
        
    except Exception as e:
        logger.error(f"LLM聊天接口异常: {e}")
        raise HTTPException(status_code=500, detail=f"LLM服务异常: {str(e)}")

start_server.py

#!/usr/bin/env python3
"""
NagaAgent 服务启动脚本
服务启动脚本,就像是一个"总开关",负责启动整个系统。
支持启动API服务器和LLM服务
"""

import asyncio
import sys
import os
import argparse  #命令行参数解析(让程序可以从命令行接收参数)
from pathlib import Path

# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent  #.parent.parent:父目录的父目录(项目根目录)
sys.path.insert(0, str(project_root)) #insert(0, ...):插入到列表最前面(优先级最高)

#uvicorn:一个Web服务器,专门运行FastAPI应用,就像Apache或Nginx,但是专门为Python异步应用设计的
from nagaagent_core.api import uvicorn

async def start_api_server():
    """启动API服务器"""
    #从apiserver.api_server模块导入app对象,这个app应该是一个FastAPI应用实例
    from apiserver.api_server import app
    
    # 从环境变量获取配置,回退到config
    #os.getenv("变量名", "默认值"):从环境变量获取值
    host = os.getenv("API_SERVER_HOST", "127.0.0.1")#如果环境变量API_SERVER_HOST存在,就用它的值
    try:
        from system.config import get_server_port #尝试:从system.config导入get_server_port函数
        default_port = get_server_port("api_server") #如果导入成功:调用get_server_port("api_server")获取端口
    except ImportError:  
        default_port = 8000    #如果导入失败(except ImportError):用默认端口8000
    #从环境变量API_SERVER_PORT获取端口,如果不存在,用上面得到的default_port,int(...):把字符串转成整数
    port = int(os.getenv("API_SERVER_PORT", str(default_port))) 
    #获取环境变量API_SERVER_RELOAD,默认值是字符串"False",.lower():转成小写(比如"False"→"false"),== "true":判断是否等于"true"
    #结果是布尔值:True或False
    reload = os.getenv("API_SERVER_RELOAD", "False").lower() == "true"
    
    print(f"启动NagaAgent API服务器...")
    print(f"地址: http://{host}:{port}")
    print(f"文档: http://{host}:{port}/docs")
    print(f"自动重载: {'开启' if reload else '关闭'}")
    '''
    这里用了f-string格式化:
      f"地址: http://{host}:{port}":把host和port变量的值插入到字符串中
      {'开启' if reload else '关闭'}:三元表达式
         如果reload为True,显示"开启"
         如果reload为False,显示"关闭"
    '''
    
    # 启动服务器
    uvicorn.run(
        "apiserver.api_server:app",
        host=host,
        port=port,
        reload=reload,   #是否开启自动重载(修改代码后自动重启)
        log_level="info",
        ws_ping_interval=None,
        ws_ping_timeout=None
    )

async def start_llm_service():
    """启动LLM服务"""
    from apiserver.llm_service import llm_app
    
    # 从环境变量获取配置,回退到config
    host = os.getenv("LLM_SERVICE_HOST", "127.0.0.1")
    try:
        from system.config import get_server_port
        #这里传的参数是"agent_server",不是"llm_service",这是配置文件中的命名
        default_port = get_server_port("agent_server")
    except ImportError:
        default_port = 8001
    port = int(os.getenv("LLM_SERVICE_PORT", str(default_port)))
    reload = os.getenv("LLM_SERVICE_RELOAD", "False").lower() == "true"
    
    print(f"启动LLM服务...")
    print(f"地址: http://{host}:{port}")
    print(f"文档: http://{host}:{port}/docs")
    print(f"自动重载: {'开启' if reload else '关闭'}")
    
    # 启动服务器
    uvicorn.run(
        "apiserver.llm_service:llm_app",
        host=host,
        port=port,
        reload=reload,
        log_level="info",
        ws_ping_interval=None,
        ws_ping_timeout=None
    )

async def main():
    """主函数"""
    #创建参数解析器
    parser = argparse.ArgumentParser(description="NagaAgent 服务启动器")
    parser.add_argument("service", choices=["api", "llm", "both"], 
                       help="要启动的服务: api(API服务器), llm(LLM服务), both(两个都启动)")
    
    #解析参数
    args = parser.parse_args()
    
    #根据参数执行不同的分支
    #启动API服务器
    if args.service == "api":
        await start_api_server()
    #启动LLM服务
    elif args.service == "llm":
        await start_llm_service()
    #启动两个服务
    elif args.service == "both":
        print("启动所有服务...")
        print("注意: 同时启动多个服务需要不同的端口配置")
        try:
            from system.config import get_server_port
            api_port = get_server_port("api_server")
            agent_port = get_server_port("agent_server")
            print(f"API服务器: http://127.0.0.1:{api_port}")
            print(f"LLM服务: http://127.0.0.1:{agent_port}")
        except ImportError:
            print("API服务器: http://127.0.0.1:8000")
            print("LLM服务: http://127.0.0.1:8001")
        
        # 这里可以实现同时启动多个服务的逻辑
        # 目前先启动API服务器
        await start_api_server()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt: #捕获键盘中断(比如按Ctrl+C)
        print("\n收到停止信号,正在关闭服务...")
    except Exception as e:
        print(f"启动失败: {e}")
        sys.exit(1) 
        
# ========整个流程总结=======
'''
用户运行脚本 → 解析命令行参数 → 根据参数选择启动的服务
       ↓
    api参数 → 启动API服务器
       ↓
    llm参数 → 启动LLM服务  
       ↓
   both参数 → 显示信息,但只启动API服务器(有bug)
'''

# =======为什么在函数内部导入模块?=======
#1. 避免循环导入
'''
想象一下这样的场景:
 文件A需要导入文件B的某个东西
 文件B也需要导入文件A的某个东西
 这样就会形成一个"死循环",Python不知道先加载哪个
 
例子:
# 文件A.py
import B  # 导入B
def func_a():
    return "A"

# 文件B.py  
import A  # 导入A
def func_b():
    return "B"
    
这样两个文件互相导入,Python会报错。

解决方法:在函数内部导入
# 文件B.py
def func_b():
    from A import func_a  # 在函数内部导入
    return "B"
'''
#2. 延迟加载,提高启动速度
'''
对比一下两种方式:
方式一:在顶部全部导入
import heavy_module1  # 这个模块很大,加载要5秒
import heavy_module2  # 这个模块也很大,加载要5秒
import heavy_module3  # 这个模块也很大,加载要5秒

def do_something():
    # 实际只用了heavy_module1
    return heavy_module1.func()
    
方式二:在函数内部按需导入
def do_something():
    import heavy_module1  # 只有调用这个函数时才加载
    return heavy_module1.func()
    
区别:
方式一:程序一启动就要加载所有模块(15秒)
方式二:只有调用函数时才加载需要的模块(5秒)
'''
#3. 条件导入(根据环境选择不同的模块)
'''
def process_image():
    # 根据操作系统选择不同的图像处理库
    import platform
    system = platform.system()
    
    if system == "Windows":
        import windows_image_lib as image_lib
    elif system == "Linux":
        import linux_image_lib as image_lib
    elif system == "Darwin":  # macOS
        import mac_image_lib as image_lib
    else:
        raise Exception("不支持的操作系统")
    
    return image_lib.process()
'''
#4. 避免不必要的依赖
'''
假设你的程序有多个功能,但不是每个用户都需要所有功能:
def export_to_excel():
    # 只有当用户需要导出Excel时才导入pandas
    import pandas as pd
    # ... 导出代码

def export_to_pdf():
    # 只有当用户需要导出PDF时才导入reportlab
    from reportlab.lib.pagesizes import letter
    # ... 导出代码
'''
#具体到这个代码的分析
'''
在代码中:
async def start_api_server():
    """启动API服务器"""
    from apiserver.api_server import app  # ← 在这里导入
为什么这么做?
  1.如果start_api_server()不被调用,apiserver.api_server模块就不会被加载
     节省内存
     加快程序启动
  2.避免可能的循环导入
     apiserver.api_server可能又导入了当前模块的某些东西
     延迟导入可以打破循环
'''
#再看另一个例子:
'''
async def stream_chat_with_context(self, messages: List[Dict], temperature: float = 0.7):
    try:
        import aiohttp  # ← 在这里导入
        timeout = aiohttp.ClientTimeout(total=180, connect=60, sock_read=120)
        
        # 后面还有
        import json  # ← 在try块里面导入
        import base64  # ← 在try块里面导入

为什么这里也这么做?
  1.aiohttp可能不是所有用户都需要的
       只有使用流式聊天功能时才需要
       如果用户从不使用这个功能,就不会安装aiohttp,程序也能正常运行
  2.异常处理中导入json和base64
  try:
      import json
      data = json.loads(data_str)
  except json.JSONDecodeError:
    continue
   这其实是一个小问题:json是Python标准库,应该在最上面导入
   这里可能是为了代码结构清晰,把相关的导入放在一起
'''

streaming_tool_extractor.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
流式文本切割器
负责将LLM流式输出按句切割并发送给语音集成(TTS)。不再检测或处理工具调用。
"""

import re
import json
import logging
import asyncio
import sys
import os
from typing import Callable, Optional, Dict, Any, Union, List #类型提示,帮助IDE和开发者理解代码类型

# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
#os.path.abspath(__file__):转为绝对路径,os.path.dirname(...):获取父目录(用两次得到项目根目录),sys.path.insert(0, ...):插入到Python搜索路径的最前面

try:
    from system.config import config, AI_NAME  # 导入配置系统
except ImportError:
    # 如果直接导入失败,尝试从父目录导入
    sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    from system.config import config, AI_NAME

# 工具调用解析/执行已不再需要

logger = logging.getLogger("StreamingToolCallExtractor")

class CallbackManager:
    """回调函数管理器 - 统一处理同步/异步回调"""
    
    def __init__(self):   
        self.callbacks = {}   #self.callbacks:存储回调函数
        #self.callback_types:存储回调函数的类型(是同步还是异步)
        self.callback_types = {}  # 缓存回调函数类型
    
    def register_callback(self, name: str, callback: Optional[Callable]): #name: str:回调函数的名字,callback: Optional[Callable]:回调函数本身,可以是None
        """注册回调函数"""
        self.callbacks[name] = callback  #把回调函数存到字典里
        if callback: #如果回调函数不是None
            # 缓存函数类型,避免重复检查
            self.callback_types[name] = asyncio.iscoroutinefunction(callback) #如果回调函数不是None
        else:
            self.callback_types[name] = False
    
    async def call_callback(self, name: str, *args, **kwargs):
        #*args:可变位置参数(可以传任意数量的位置参数)
        #**kwargs:可变关键字参数(可以传任意数量的关键字参数)
        """统一调用回调函数"""
        callback = self.callbacks.get(name)
        if not callback:
            return None
            
        try:   #从缓存中获取这个函数是否是异步的,如果是异步的(返回True),就用await调用,如果是同步的(返回False),就直接调用
            if self.callback_types.get(name, False):
                # 异步回调
                return await callback(*args, **kwargs)
            else:
                # 同步回调
                return callback(*args, **kwargs)
        except Exception as e:
            logger.error(f"回调函数 {name} 执行错误: {e}")
            return None
        #为什么要这样区分?,异步函数必须用await调用,否则不会执行,同步函数不能用await调用,否则会报错,这个管理器统一处理了这两种情况

class StreamingToolCallExtractor:
    """流式文本切割器 - 实时按句切割并发送给TTS,支持工具调用处理"""
    
    def __init__(self, mcp_manager=None):
        self.mcp_manager = mcp_manager      #MCP管理器(是其他模块的引用)
        self.text_buffer = ""  # 普通文本缓冲区,(临时存放正在处理的文本)
        self.complete_text = ""  # 完整文本内容,(存放所有处理过的文本)
        self.sentence_endings = r"[。?!;\.\?\!\;]"  # 断句标点
        
        # 使用回调管理器
        self.callback_manager = CallbackManager() #创建回调管理器实例
        
        # 语音集成(可选)(初始为空)
        self.voice_integration = None  
        
        # 工具调用功能已移除
        self.tool_calls_queue = None
    
    #设置回调函数   
    def set_callbacks(self, 
                     on_text_chunk: Optional[Callable] = None,
                     voice_integration=None):
        """设置回调函数"""
        # 注册回调函数(仅文本块)
        self.callback_manager.register_callback("text_chunk", on_text_chunk) #注册一个名为"text_chunk"的回调函数
        self.voice_integration = voice_integration #设置语音集成接口
    
    async def process_text_chunk(self, text_chunk: str):
        """
        处理文本块,实时按句切割并发送给语音集成
        
        处理流程:
        1. 累积完整文本(用于最终保存)
        2. 逐字符检查句子结束符
        3. 遇到结束符时立即切割并发送完整句子到TTS
        4. 保留未完成的句子部分继续累积
        """
        if not text_chunk:
            return None
        
        
        # 调用文本块回调,将文本发送到前端
        results = []  #创建一个空列表results存放结果
        #调用"text_chunk"回调函数,传入两个参数:text_chunk和字符串"chunk"
        result = await self.callback_manager.call_callback("text_chunk", text_chunk, "chunk")  
        if result: #如果有返回值,添加到results列表
            results.append(result)

        # 累积完整文本(用于最终保存到数据库)
        self.complete_text += text_chunk
        #把收到的文本块添加到完整文本中,就像把收到的信件放到文件夹里保存
     
# =====这是最复杂的部分,我们用简单的方式解释=====
#逻辑流程:
'''
收到文本块 → 遍历每个字符 → 添加到缓冲区 → 检查是否是结束符
        ↓
   如果是结束符 → 切割句子 → 发送完整句子给TTS → 保留剩余部分
        ↓
   返回处理结果
   
详细分解:
  1.for char in text_chunk::遍历文本块中的每个字符
  2.self.text_buffer += char:把字符添加到缓冲区
  3.if re.search(self.sentence_endings, char)::检查这个字符是不是句子结束符
       使用正则表达式匹配:[。?!;\.\?\!\;]
       包括中文和英文的句号、问号、感叹号、分号
  4.如果遇到结束符:sentences = re.split(self.sentence_endings, self.text_buffer)
       用结束符分割整个缓冲区
       比如:"你好。今天天气" → ["你好", "今天天气"]
  5.if len(sentences) > 1::如果分割出多个部分
       complete_sentence = sentences[0] + char
       第一个部分 + 结束符 = 完整句子
       "你好" + "。" = "你好。"
 6.if complete_sentence.strip()::如果句子不是空的   
       self._send_to_voice_integration(complete_sentence)
       保留第一个完整句子之后的部分
       比如:"今天天气"留在缓冲区等待更多字符
'''
        # 实时按句切割并发送到TTS
        for char in text_chunk:
            self.text_buffer += char
            # 检查是否遇到句子结束符(。?!;等)
            if re.search(self.sentence_endings, char):
                # 立即切割并发送完整句子到TTS
                sentences = re.split(self.sentence_endings, self.text_buffer)
                if len(sentences) > 1:
                    complete_sentence = sentences[0] + char
                    if complete_sentence.strip():
                        # 立即发送到语音集成进行TTS合成(不阻塞文本流)
                        self._send_to_voice_integration(complete_sentence)
                    # 保留未完成的句子部分,继续累积
                    remaining_sentences = [s for s in sentences[1:] if s.strip()]
                    self.text_buffer = "".join(remaining_sentences)
        return results if results else None
    
    #刷新文本缓冲区,当流式传输结束时,发送缓冲区中剩余的文本
    async def _flush_text_buffer(self):
        """刷新文本缓冲区 - 处理流式结束时的剩余文本"""
        if self.text_buffer:
            # 立即发送剩余的未完成句子到语音集成(不阻塞)
            self._send_to_voice_integration(self.text_buffer)
            
            self.text_buffer = ""
            return None
        return None
    
    #发送到语音集成
    def _send_to_voice_integration(self, text: str):
        """发送文本到语音集成(不阻塞文本流)"""
        if self.voice_integration:
            try:
                # 在独立线程中处理TTS,不阻塞文本流
                import threading  #导入线程模块
                threading.Thread(      #创建并启动一个新线程
                    target=self.voice_integration.receive_text_chunk,     #线程要执行的函数
                    args=(text,),       #传给函数的参数
                    daemon=True         #设置为守护线程(主程序退出时自动结束)
                ).start()
            except Exception as e:
                logger.error(f"发送到语音集成失败: {e}")
    #为什么用线程?TTS(文本转语音)可能需要较长时间,如果用主线程执行,会阻塞后续文本的处理,用新线程执行,可以"边说话边接收新文本"
    # 工具调用相关方法已移除,功能已迁移到background_analyzer
    
    #完成处理,结束处理流程,清理缓冲区
    async def finish_processing(self):
        """完成处理,清理剩余内容"""
        results = []
        
        # 处理剩余的文本
        if self.text_buffer:
            result = await self._flush_text_buffer()
            if result:
                results.append(result)
        
        return results if results else None
    
    def get_complete_text(self) -> str:
        """获取完整文本内容"""
        return self.complete_text
    
    def reset(self):
        """重置提取器状态"""
        self.text_buffer = ""
        self.complete_text = ""
    
    async def process_streaming_response(self, llm_service, messages: List[Dict], 
                                       temperature: float = 0.7, voice_integration=None):
        '''
        参数1:llm_service - LLM服务实例
        参数2:messages - 聊天消息列表
        参数3:temperature - 温度参数(默认0.7)
        参数4:voice_integration - 语音集成接口
        '''
        """处理流式响应,整合LLM调用和TTS处理,(核心集成方法)"""
        if voice_integration:
            self.voice_integration = voice_integration
        
        async for chunk in llm_service.stream_chat_with_context(messages, temperature):
            if chunk.startswith("data: "):
                # 解码base64内容
                try:
                    import base64
                    data_str = chunk[6:].strip()  #去掉前6个字符"data: "
                    if data_str == '[DONE]':
                        break   #如果是"[DONE]"表示流结束
                    decoded = base64.b64decode(data_str).decode('utf-8')   #否则解码base64,处理文本
                    await self.process_text_chunk(decoded)
                except Exception as e:
                    logger.error(f"处理流式响应块失败: {e}")
                    continue
            else:
                # 直接处理文本内容
                await self.process_text_chunk(chunk)
        
        # 完成处理
        await self.finish_processing()
        return self.get_complete_text()
       '''
       开始处理流式响应
            ↓
       遍历每个数据块
            ↓
            ↓---是base64格式---解码---处理文本
            ↓
            ↓---是普通文本---直接处理文本
            ↓
       切割句子,发送给TTS
            ↓
       累积到完整文本
            ↓
       流结束→清理缓冲区→返回完整文本
       '''
# ======为什么有异步函数还要用多线程?=========
#Python并发编程中的一个重要话题
#简单回答:异步和多线程解决不同的问题,有时候需要组合使用。
'''
详细对比:
特性      |   异步 (asyncio)         | 多线程 (threading)
核心思想   | 单线程并发,任务切换        | 多线程并行,同时运行
适合场景   | I/O密集型任务(网络、文件)  | I/O密集型,某些CPU密集型
资源消耗   | 轻量(一个线程)           | 较重(每个线程有独立栈)
编程难度   | 中等(需要理解async/await)| 较高(需要处理线程安全)
Python限制| 受GIL影响较小             | 受GIL限制(CPU密集型不高效)

为什么在这个代码中要同时使用?
看这个代码片段:
def _send_to_voice_integration(self, text: str):
    if self.voice_integration:
        try:
            # 在独立线程中处理TTS,不阻塞文本流
            import threading
            threading.Thread(
                target=self.voice_integration.receive_text_chunk,
                args=(text,),
                daemon=True
            ).start()
        except Exception as e:
            logger.error(f"发送到语音集成失败: {e}")
 
可能的原因:   
原因1:TTS库可能是同步的,(很多语音合成库是同步的)  
# 假设TTS库是这样的(不支持异步)
class 同步TTS库:
    def receive_text_chunk(self, text):
        # 这是一个同步方法,可能需要几秒钟
        time.sleep(3)  # 模拟耗时操作
        print(f"播放: {text}")  
如果在异步函数中直接调用同步的TTS:    
async def process_text_chunk(self, text):
    # 如果直接调用同步TTS
    self.voice_integration.receive_text_chunk(text)  # 这会阻塞3秒!
    # 在这3秒内,整个异步事件循环都被阻塞了
    
原因2:避免阻塞事件循环
异步编程有一个黄金法则:不要阻塞事件循环    
\# 不好的做法:在异步函数中调用耗时同步函数
async def bad_example():
    await 异步操作1()
    耗时同步函数()  # 这会阻塞整个事件循环!
    await 异步操作2()  # 要等上面完成才能执行

原因3:TTS是CPU密集型(生成语音需要大量计算)
 
# 好的做法:把耗时同步函数放到线程中
async def good_example():
    await 异步操作1()
    await asyncio.to_thread(耗时同步函数)  # 在后台线程执行
    await 异步操作2()  # 可以立即执行,不用等待
    
异步和多线程的配合在这个具体代码中的应用:
async def process_streaming_response(self, ...):
    # 异步接收AI的流式响应
    async for chunk in llm_service.stream_chat_with_context(...):
        # 异步处理文本
        await self.process_text_chunk(chunk)
        # 文本处理是异步的,但TTS是同步的
        # 所以TTS放在线程中,不阻塞文本接收
'''
# ========GIL是什么?(全局解释器锁)========
'''
简单理解:
想象Python解释器是一个只能一个人进去的厕所。
 厕所 = Python解释器
 锁 = GIL(门锁)
 规则:每次只能有一个人进去上厕所(执行Python代码)
 
为什么要有这个锁?
因为Python设计得比较早......6

后果是即使你有多个线程,但因为有GIL,一次还是只能有一行Python代码。
当然也是有例外的:当线程在等待时(比如等待网络数据),它会暂时把锁让出来,让别人进去。
'''
#多线程受GIL限制,异步需要Python 3.5+
#怎么绕过GIL?
'''
用多进程(Multiprocessing)
 每个进程有自己的Python解释器
 每个进程有自己的GIL
 可以真正并行
用异步(Asyncio)
 一个线程内切换任务
 适合I/O密集型
用其他语言写的扩展
 比如用C写的numpy,在执行C代码时释放GIL
'''
# =========什么是I/O密集型和CPU密集型?==========
'''
I/O密集型(Input/Output):
大部分时间在等待,比如:
从网络下载文件
从数据库读取数据
从硬盘读取文件
等待用户输入
特点:CPU大部分时间在等,没事做

CPU密集型:
大部分时间在计算,比如:
视频编码/解码
图像处理
科学计算
加密解密
特点:CPU一直很忙
'''
# =====协程(Coroutine)======
#从零开始解释什么是协程?
#第一步:先理解函数
#普通函数就像一个一次性任务清单:
'''
def 做早饭():
    1. 煎蛋
    2. 烤面包
    3. 倒牛奶
    return "早餐完成"  # 结束了,不能中途暂停
'''
#特点:开始执行 → 一直执行到结束 → 返回结果,不能中途暂停,不能恢复

#第二步:理解生成器(Generator)
#协程的前身是生成器。生成器就像一个可暂停的任务清单:
'''
def 做复杂的早饭():
    print("开始煎蛋")
    yield "煎蛋完成"  # 暂停在这里,返回结果
    
    print("开始烤面包")
    yield "烤面包完成"  # 暂停在这里,返回结果
    
    print("开始倒牛奶")
    yield "早餐完成"  # 最后暂停

# 使用
早餐机 = 做复杂的早饭()
print(next(早餐机))  # 输出:开始煎蛋 \n 煎蛋完成
print(next(早餐机))  # 输出:开始烤面包 \n 烤面包完成
print(next(早餐机))  # 输出:开始倒牛奶 \n 早餐完成
'''
#生成器的特点:可以暂停(yield),可以恢复(next()),可以传值进来(通过send())

#第三步:协程是升级版的生成器
#协程就像一个可以双向通信的智能任务清单:
'''
async def 智能做早饭():
    材料 = await 获取材料()  # 等待别人送材料来
    print(f"用{材料}煎蛋")
    
    温度 = await 检查温度()  # 等待温度计读数
    print(f"在{温度}度下烤面包")
    
    return "早餐完成"
'''
#协程的特点:可以暂停(await),可以恢复(自动),可以和其他协程协作(通过事件循环)

# =======Python中的协程进化史=========
'''
Python 2.x: 只有生成器(yield)      ← 可以暂停,但不好用
Python 3.3: yield from              ← 改进生成器
Python 3.4: @asyncio.coroutine      ← 最早的协程装饰器
Python 3.5: async/await             ← 现在的方式(最简单)
'''
#协程的具体工作原理
#协程的三个状态:准备中(CREATED):协程对象创建了,但还没开始,运行中(RUNNING):正在执行,暂停中(SUSPENDED):遇到await暂停了

#协程的生命周期:
'''
创建协程 → 放入事件循环 → 开始执行 → 遇到await暂停
                                      ↓
                               事件循环执行其他任务
                                      ↓
                               await的条件满足了
                                      ↓
                               恢复执行 → 完成/异常
'''

#协程的核心:事件循环(Event Loop)
'''
事件循环就像一个餐厅经理:
  管理所有协程(厨师)
  哪个协程可以执行了(哪个锅可以炒了)
  哪个协程需要等待(哪个菜还在切)
'''
#事件循环的工作流程:
'''
开始
  ↓
检查协程队列
  ↓
有就绪的协程吗? → 有 → 执行它
  ↓ 没有
等待事件发生(I/O完成、定时器到点)
  ↓
事件发生 → 对应的协程就绪
  ↓
继续检查...
'''
# =======协程怎么那么像异步啊?=========
#协程就是异步,或者说协程是实现异步的一种具体技术.......6
#异步 是一个 目标(想要同时做多件事),协程 是 实现这个目标的工具(具体怎么做)\
#就像:我要从北京到上海(异步:想更快到达),我可以选择坐高铁、飞机、开车(协程:具体的方法)
#更准确的层次关系:
'''
编程模式层面:
   异步编程(Asynchronous Programming)
     ↓
具体实现层面:
   Python:协程(Coroutine)← 这是Python实现异步的方式
   JavaScript:Promise/async-await ← 这是JS实现异步的方式  
   Go:Goroutine ← 这是Go实现异步的方式
'''
#为什么会有这种混淆?
#因为不同语言用不同的词,但说的是一回事:由篇幅限制,这里就不再过多赘述了。但核心思想都一样:让程序在等待时可以做其他事。
#所以到这需要对前面的观点进行一些修复,画一个更准确的关系图:
'''
异步编程(概念)
    │
    ├── 实现方式1:多线程/多进程
    │
    ├── 实现方式2:事件驱动
    │
    └── 实现方式3:协程(Coroutine)← Python的选择
            │
            ├── 实现方式:async/await语法
            ├── 运行时:asyncio事件循环  
            └── 特点:轻量级、单线程内并发
'''
#为什么Python选择协程?
'''
历史原因:
早期Python:有GIL限制,多线程效果不好
Python 3.4:引入asyncio库,用协程实现异步
Python 3.5:引入async/await语法,让协程更容易写
'''
#所以换一个角度理解:异步 是 "什么"(What):我想要我的程序不要傻等,我想要同时处理多个请求,我想要高效利用CPU时间
#协程 是 "怎么实现"(How):Python说:我们用async/await语法,我们写协程函数,我们用asyncio事件循环来调度
#到时候我们进行最后一步澄清:协程 ≠ 异步,更准确地说:协程是实现异步的一种具体技术手段。
#就像:飞机 ≠ 快速出行(飞机是实现快速出行的一种方式),协程 ≠ 异步编程(协程是实现异步编程的一种方式)

message_manager.py

#!/usr/bin/env python3
"""
统一的消息管理模块
支持多会话、多agent的消息存储和拼接
"""

# 异步编程工具(让程序可以同时做多件事情)
import asyncio

# 生成唯一ID的工具(像身份证号码生成器)
import uuid

# 日志记录工具(像日记本,记录程序做了什么)
import logging

# 正则表达式工具(文本搜索和匹配工具)
import re

# JSON数据处理工具(处理像{"name":"小明"}这样的数据格式)
import json

# 系统工具(访问命令行参数等)
import sys

# 时间工具
import time

# 类型提示(只是给程序员看的,告诉别人参数应该是什么类型)
from typing import Dict, List, Optional, Any

# 日期时间工具
from datetime import datetime, timedelta

# 路径处理工具(更方便地处理文件路径)
from pathlib import Path

logger = logging.getLogger(__name__)


# 工具函数
def now():
    """获取当前时间戳"""
    return time.strftime('%H:%M:%S:') + str(int(time.time() * 1000) % 10000)


def setup_logging():
    """统一配置日志系统"""
    try:
        # 尝试从配置文件读取设置
        from system.config import config
        log_level = getattr(logging, config.system.log_level.upper(), logging.INFO)
        # 配置日志的基本设置        
        logging.basicConfig(
            level=log_level,  # 日志级别
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',  # 日志格式
            handlers=[logging.StreamHandler(sys.stderr)]  # 输出到错误流
        )

        # 设置第三方库日志级别
        for logger_name in ["httpcore.connection", "httpcore.http11", "httpx", "openai._base_client", "asyncio"]:
            logging.getLogger(logger_name).setLevel(logging.WARNING)
    except ImportError:
        # 如果无法导入配置,使用默认设置
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[logging.StreamHandler(sys.stderr)]
        )


class MessageManager:
    """统一的消息管理器"""

    def __init__(self):
        self.sessions: Dict[str, Dict] = {}
        # 分析状态跟踪,防止重复执行
        self.analysis_in_progress: Dict[str, bool] = {}
        # 从配置文件读取最大历史轮数,默认为10轮
        try:
            from system.config import config
            self.max_history_rounds = config.api.max_history_rounds  # 最大对话轮数
            self.max_messages_per_session = self.max_history_rounds * 2  # 每轮对话包含用户和助手各一条消息
            self.persistent_context = config.api.persistent_context  # 是否保存对话历史
            self.context_load_days = config.api.context_load_days  # 加载多少天的历史
            self.log_dir = config.system.log_dir  # 日志保存目录
            self.ai_name = config.system.ai_name  # AI的名字
        except ImportError:
            # 如果找不到配置,就用默认值
            self.max_history_rounds = 10
            self.max_messages_per_session = 20  # 默认20条消息
            self.persistent_context = True
            self.context_load_days = 3
            self.log_dir = Path("logs")
            self.ai_name = "娜迦"
            logger.warning("无法导入配置,使用默认历史轮数设置")

    def generate_session_id(self) -> str:
        """生成唯一的会话ID"""
        return str(uuid.uuid4())

    def create_session(self, session_id: Optional[str] = None) -> str:
        """获取或创建会话"""
        if not session_id:
            session_id = self.generate_session_id()

        # 检查会话是否已存在
        if session_id in self.sessions:
            logger.debug(f"使用现有会话: {session_id}")
            # 更新最后活动时间
            self.sessions[session_id]["last_activity"] = asyncio.get_event_loop().time()
            return session_id

        # 初始化新会话
        self.sessions[session_id] = {
            "created_at": asyncio.get_event_loop().time(),
            "messages": [],  # 消息列表(空的)
            "agent_type": "default",  # 可以扩展支持不同agent类型
            "last_activity": asyncio.get_event_loop().time()  # 最后活动时间
        }
        '''
        逻辑判断:
          如果 没有提供session_id 就 生成一个新的
          如果 session_id已经在sessions字典里 就 更新最后活动时间并返回
          否则 创建新房间,记录创建时间、空的消息列表等
          如果 persistent_context是True 就 尝试加载历史对话
        '''

        # 如果启用持久化上下文,尝试加载历史对话
        if self.persistent_context:
            self._load_persistent_context_for_session(session_id)

        logger.info(f"创建新会话: {session_id}")
        return session_id

    def _load_persistent_context_for_session(self, session_id: str):
        """为指定会话加载持久化上下文"""
        try:
            # 加载历史对话
            recent_messages = self.load_recent_context(
                days=self.context_load_days,
                max_messages=self.max_messages_per_session
            )

            if recent_messages:
                self.sessions[session_id]["messages"] = recent_messages
                logger.info(f"会话 {session_id} 加载了 {len(recent_messages)} 条历史对话")
            else:
                logger.debug(f"会话 {session_id} 未找到历史对话记录")

        except Exception as e:
            logger.warning(f"为会话 {session_id} 加载持久化上下文失败: {e}")

    def get_session(self, session_id: str) -> Optional[Dict]:
        """获取会话信息"""
        return self.sessions.get(session_id)

    def add_message(self, session_id: str, role: str, content: str) -> bool:
        """向会话添加消息"""
        if session_id not in self.sessions:
            logger.warning(f"会话不存在: {session_id}")
            return False

        session = self.sessions[session_id]
        session["messages"].append({"role": role, "content": content})  # 添加消息
        session["last_activity"] = asyncio.get_event_loop().time()  # 更新活动时间

        # 限制消息数量,如果消息太多了,就删除最早的消息(只保留最近的)
        if len(session["messages"]) > self.max_messages_per_session:
            session["messages"] = session["messages"][-self.max_messages_per_session:]

        logger.debug(f"会话 {session_id} 添加消息: {role} - {content[:50]}...")
        return True

    def get_messages(self, session_id: str) -> List[Dict]:
        """获取会话的所有消息"""
        session = self.sessions.get(session_id)  # :在消息字典里查找这个消息,如果找到消息:返回session["messages"](这个字典里的所有对话)
        return session["messages"] if session else []  # 如果没找到,返回[](空列表)

    def get_recent_messages(self, session_id: str, count: Optional[int] = None) -> List[Dict]:
        """获取会话的最近消息"""
        if count is None:  # # 如果没有指定要多少条
            count = self.max_messages_per_session  # 就用默认的最大消息数
        messages = self.get_messages(session_id)  # 先获取所有消息
        # session_id:房间号,count:想要多少条最近的消息(可选,不填就用默认值)
        return messages[-count:] if messages else []

    # 这是最重要的功能之一
    def build_conversation_messages(self, session_id: str, system_prompt: str,
                                    current_message: str, include_history: bool = True) -> List[Dict]:
        """构建完整的对话消息列表"""
        messages = []  # 准备一个空的消息包

        # 添加当前时间信息到系统提示词
        from datetime import datetime
        current_time = datetime.now()  # 获取当前时间
        # 制作时间信息字符串
        time_info = f"\n\n【当前时间信息】\n当前日期:{current_time.strftime('%Y年%m月%d日')}\n当前时间:{current_time.strftime('%H:%M:%S')}\n当前星期:{current_time.strftime('%A')}\n"
        enhanced_system_prompt = system_prompt + time_info  # 把时间信息加到系统提示词后面

        # 添加系统提示词(这是第一条消息)
        messages.append({"role": "system", "content": enhanced_system_prompt})

        # 添加历史对话 (如果需要的话)
        if include_history:  # 默认是True,包含历史
            recent_messages = self.get_recent_messages(session_id)  # 获取最近的历史对话
            messages.extend(recent_messages)  # 把历史对话加到消息包里

        # 添加当前用户消息 (这是最后一条消息)
        messages.append({"role": "user", "content": current_message})

        return messages  # 返回完整的消息包

    def build_conversation_messages_from_memory(self, memory_messages: List[Dict], system_prompt: str,
                                                current_message: str, max_history_rounds: int = None) -> List[Dict]:
        """
        从内存消息列表构建对话消息(用于UI界面)

        Args:
            memory_messages: 内存中的消息列表
            system_prompt: 系统提示词
            current_message: 当前用户消息
            max_history_rounds: 最大历史轮数,默认使用配置值

        Returns:
            List[Dict]: 完整的对话消息列表
        """
        messages = []

        # 添加当前时间信息到系统提示词
        from datetime import datetime
        current_time = datetime.now()
        time_info = f"\n\n【当前时间信息】\n当前日期:{current_time.strftime('%Y年%m月%d日')}\n当前时间:{current_time.strftime('%H:%M:%S')}\n当前星期:{current_time.strftime('%A')}\n"
        enhanced_system_prompt = system_prompt + time_info

        # 添加系统提示词
        messages.append({"role": "system", "content": enhanced_system_prompt})

        # 计算最大消息数量
        if max_history_rounds is None:
            max_history_rounds = self.max_history_rounds

        max_messages = max_history_rounds * 2  # 每轮对话包含用户和助手各一条消息

        # 添加历史对话(限制数量)
        if memory_messages:
            recent_messages = memory_messages[-max_messages:]
            messages.extend(recent_messages)

        # 添加当前用户消息
        messages.append({"role": "user", "content": current_message})

        return messages

    def get_session_info(self, session_id: str) -> Optional[Dict]:
        """获取会话详细信息"""
        session = self.sessions.get(session_id)
        if not session:
            return None

        return {
            "session_id": session_id,
            "created_at": session["created_at"],
            "last_activity": session["last_activity"],
            "message_count": len(session["messages"]),
            "conversation_rounds": len(session["messages"]) // 2,  # 有趣的计算:len(session["messages"]) // 2:用整数除法计算轮数
            "agent_type": session["agent_type"],
            "max_history_rounds": self.max_history_rounds,  # 添加最大历史轮数信息
            "last_message": session["messages"][-1]["content"][:100] + "..." if session["messages"] else "无对话历史"
        }

    def get_all_sessions_info(self) -> Dict[str, Dict]:
        """获取所有会话信息"""
        sessions_info = {}  # 创建一个空字典来存放所有房间信息
        # 遍历self.sessions字典中的每个房间
        for session_id, session in self.sessions.items():
            # 对每个房间,调用get_session_info获取详细信息
            sessions_info[session_id] = self.get_session_info(session_id)
        return sessions_info  # 返回包含所有房间信息的字典

    def delete_session(self, session_id: str) -> bool:
        """删除指定会话"""
        if session_id in self.sessions:  # 如果房间存在
            del self.sessions[session_id]
            删除这个房间
            logger.info(f"删除会话: {session_id}")
            return True
        return False

    def clear_all_sessions(self) -> int:
        """清空所有会话"""
        count = len(self.sessions)  # 先统计有多少个房间
        self.sessions.clear()  # 清空整个字典
        logger.info(f"清空所有会话,共 {count} 个")  # 记录清空了多少个
        return count  # 返回清空的数量

    # 这是非常重要的功能,防止内存被不活动的房间占满:
    def cleanup_old_sessions(self, max_age_hours: int = 24) -> int:
        """清理过期会话"""
        current_time = asyncio.get_event_loop().time()  # 获取当前时间
        expired_sessions = []  # 创建一个列表来存放过期的房间号

        # 第一遍:找出所有过期的房间
        for session_id, session in self.sessions.items():
            # 计算房间多久没活动了(当前时间 - 最后活动时间)
            # 如果超过max_age_hours小时(默认24小时),就标记为过期
            if current_time - session["last_activity"] > max_age_hours * 3600:
                expired_sessions.append(session_id)  # 添加到过期列表

        # 第二遍:删除所有过期的房间
        for session_id in expired_sessions:
            del self.sessions[session_id]  # 从字典中删除

        # 如果有清理过期的房间,就记录日志
        if expired_sessions:
            logger.info(f"清理了 {len(expired_sessions)} 个过期会话")

        return len(expired_sessions)  # 返回清理了多少个

    def set_agent_type(self, session_id: str, agent_type: str) -> bool:
        """设置会话的agent类型"""
        if session_id in self.sessions:  # 如果房间存在
            self.sessions[session_id]["agent_type"] = agent_type  # 设置类型
            return True  # 设置成功
        return False  # 房间不存在,设置失败

    def get_agent_type(self, session_id: str) -> Optional[str]:
        """获取会话的agent类型"""
        session = self.sessions.get(session_id)  # 先找到房间
        return session["agent_type"] if session else None  # 如果有房间就返回类型,否则返回None

    # ========== 日志解析功能 ==========

    def _parse_log_line(self, line: str) -> Optional[tuple]:
        """
        解析单行日志内容

        Args:
            line: 日志行内容

        Returns:
            tuple: (role, content) 或 None
        """
        line = line.strip()  # 1. 去掉首尾空白字符
        if not line:  # 2. 如果是空行
            return None  # 返回None

        # 3.匹配格式:[时间] 用户: 内容 或 [时间] AI名称: 内容
        pattern = r'^\[(\d{2}:\d{2}:\d{2})\] (用户|' + re.escape(self.ai_name) + r'): (.+)$'
        match = re.match(pattern, line)  # 4. 尝试匹配

        if match:  # 5. 如果匹配成功
            time_str, speaker, content = match.groups()  # 6. 提取三部分
            if speaker == "用户":  # 7. 判断说话者
                role = "user"  # 用户说的话
            else:
                role = "assistant"  # AI说的话
            return (role, content.strip())  # 8. 返回角色和内容

        return None  # 9. 匹配失败,返回None

    def _is_message_start_line(self, line: str) -> bool:
        """
        判断是否为消息开始行

        Args:
            line: 日志行内容

        Returns:
            bool: 是否为消息开始行
        """
        line = line.strip()  # strip():去掉字符串开头和结尾的空白字符(空格、制表符、换行符)
        if not line:  # not line:如果line是空字符串(长度为0)
            return False  # 空行就直接返回None,不处理

        # 匹配格式:[时间] 用户: 或 [时间] AI名称:
        pattern = r'^\[(\d{2}:\d{2}:\d{2})\] (用户|' + re.escape(self.ai_name) + r'):'
        return bool(re.match(pattern, line))

    def parse_log_file(self, log_file_path: str) -> List[Dict]:
        """
        解析单个日志文件,提取对话内容
        按照日志记录代码的格式:每轮对话包含用户消息和AI回复,用50个-分隔

        Args:
            log_file_path: 日志文件路径

        Returns:
            List[Dict]: 对话消息列表,格式为[{"role": "user/assistant", "content": "内容"}]
        """
        messages = []  # 1. 准备空列表存放结果

        try:
            # 2. 打开并读取文件
            with open(log_file_path, 'r', encoding='utf-8') as f:
                content = f.read()

            # 以50个-分割对话轮次(按照日志记录代码的格式)
            conversation_blocks = content.split('-' * 50)

            for block in conversation_blocks:
                block = block.strip()  # 去掉空白
                if not block:  # 跳过空块
                    continue

                # 解析每个对话块中的消息
                block_messages = self._parse_conversation_block(block)
                messages.extend(block_messages)  # 6. 添加到结果列表

        except FileNotFoundError:
            logger.debug(f"日志文件不存在: {log_file_path}")
        except Exception as e:
            logger.error(f"解析日志文件失败 {log_file_path}: {e}")

        return messages  # 7. 返回所有消息

    def _parse_conversation_block(self, block: str) -> List[Dict]:
        """
        解析单个对话块,提取其中的所有消息
        每块包含用户消息和AI回复,支持多行内容

        Args:
            block: 对话块内容

        Returns:
            List[Dict]: 消息列表
        """
        messages = []  # 1. 存放解析出的消息
        lines = block.split('\n')  # 2. 按换行符分割成多行
        current_message = None  # 3. 当前正在处理的消息(None表示没有)
        current_content_lines = []  # 4. 当前消息的内容行列表

        # 5. 逐行处理
        for line in lines:
            line = line.rstrip('\n\r')  # 6.移除行尾换行符,但保留内容中的换行

            # 7.检查是否为消息开始行
            if self._is_message_start_line(line):
                # 8.保存前一个消息
                if current_message is not None and current_content_lines:
                    content = '\n'.join(current_content_lines)  # 合并内容行
                    messages.append({
                        "role": current_message["role"],
                        "content": content
                    })

                # 9.开始新消息
                result = self._parse_log_line(line)  # 解析这一行
                if result:  # 如果解析成功
                    role, content = result
                    current_message = {"role": role}  # 记录角色
                    current_content_lines = [content] if content else []  # 初始化内容
                else:  # 解析失败
                    current_message = None
                    current_content_lines = []

            # 10.如果当前有活跃消息,且不是消息开始行,则作为内容行处理
            elif current_message is not None:
                # 跳过分隔线和空行
                if line.strip() and not line.strip().startswith('---') and not line.strip().startswith('--'):
                    current_content_lines.append(line  # 添加到当前消息内容

                    # 11.保存最后一个消息(循环结束后)
                    if current_message is not None and current_content_lines:
                        content = '\n'.join(current_content_lines)
                    messages.append({
                        "role": current_message["role"],
                        "content": content
                    })

        return messages

    def get_log_files_by_date(self, days: int = 3) -> List[str]:
        """
        获取最近几天的日志文件路径

        Args:
            days: 要获取的天数

        Returns:
            List[str]: 日志文件路径列表,按日期倒序排列
        """
        log_files = []
        today = datetime.now()

        for i in range(days):
            date = today - timedelta(days=i)
            date_str = date.strftime('%Y-%m-%d')
            log_file = self.log_dir / f"{date_str}.log"

            if log_file.exists():
                log_files.append(str(log_file))

        # 按日期倒序排列(最新的在前)
        log_files.reverse()
        return log_files

    def load_recent_context(self, days: int = 3, max_messages: int = None) -> List[Dict]:
        """
        加载最近几天的对话上下文

        Args:
            days: 要加载的天数
            max_messages: 最大消息数量限制

        Returns:
            List[Dict]: 对话消息列表
        """
        all_messages = []
        log_files = self.get_log_files_by_date(days)

        logger.info(f"开始加载最近 {days} 天的日志文件: {log_files}")

        for log_file in log_files:
            messages = self.parse_log_file(log_file)
            all_messages.extend(messages)
            logger.debug(f"从 {log_file} 加载了 {len(messages)} 条消息")

        # 限制消息数量
        if max_messages and len(all_messages) > max_messages:
            all_messages = all_messages[-max_messages:]  # 7. 只保留最近的消息
            logger.info(f"限制消息数量为 {max_messages} 条")

        logger.info(f"总共加载了 {len(all_messages)} 条历史对话")
        return all_messages

    def get_context_statistics(self, days: int = 7) -> Dict:
        """
        获取上下文统计信息

        Args:
            days: 统计天数

        Returns:
            Dict: 统计信息
        """
        log_files = self.get_log_files_by_date(days)
        total_messages = 0  # 2. 初始化计数器
        user_messages = 0
        assistant_messages = 0

        # 3. 遍历每个文件
        for log_file in log_files:
            messages = self.parse_log_file(log_file)  # 4. 解析文件
            total_messages += len(messages)  # 5. 累加总消息数

            # 6. 统计用户和AI消息数量
            for msg in messages:
                if msg["role"] == "user":
                    user_messages += 1
                else:
                    assistant_messages += 1

        return {
            "total_files": len(log_files),  # 文件数量
            "total_messages": total_messages,  # 总消息数
            "user_messages": user_messages,  # 用户消息数
            "assistant_messages": assistant_messages,  # AI消息数
            "days_covered": days  # 统计天数
        }

    def save_conversation_log(self, user_message: str, assistant_message: str, dev_mode: bool = False):
        """
        保存对话日志到文件

        Args:
            user_message: 用户消息
            assistant_message: 助手回复
            dev_mode: 是否为开发者模式(开发者模式不保存日志)
        """
        if dev_mode:
            return  # 开发者模式不写日志

        try:
            from datetime import datetime
            import os

            # 获取当前时间
            now = datetime.now()
            date_str = now.strftime('%Y-%m-%d')
            time_str = now.strftime('%H:%M:%S')

            # 确保日志目录存在
            log_dir = str(self.log_dir)
            if not os.path.exists(log_dir):
                os.makedirs(log_dir, exist_ok=True)  # 创建目录(如果不存在)
                logger.info(f"已创建日志目录: {log_dir}")

            # 保存对话日志
            log_file = os.path.join(log_dir, f"{date_str}.log")
            with open(log_file, 'a', encoding='utf-8') as f:
                f.write(f"[{time_str}] 用户: {user_message}\n")  # 用户消息
                f.write(f"[{time_str}] {self.ai_name}: {assistant_message}\n")  # AI回复
                f.write("-" * 50 + "\n")  # 分隔线

            logger.debug(f"已保存对话日志到: {log_file}")

        except Exception as e:
            logger.error(f"保存对话日志失败: {e}")

    def save_conversation_and_logs(self, session_id: str, user_message: str, assistant_response: str):
        """统一保存对话历史与日志 - 整合重复逻辑"""
        try:
            # 保存对话历史到消息管理器
            self.add_message(session_id, "user", user_message)
            self.add_message(session_id, "assistant", assistant_response)

            # 保存对话日志到文件
            self.save_conversation_log(
                user_message,
                assistant_response,
                dev_mode=False  # 开发者模式已禁用
            )
        except Exception as e:
            logger.error(f"保存对话与日志失败: {e}")

    def trigger_background_analysis(self, session_id: str):
        """统一触发后台意图分析 - 整合重复逻辑"""
        try:
            # 检查是否已经有分析在进行中
            # 使用analysis_in_progress字典防止重复分析:
            if self.analysis_in_progress.get(session_id):  # 检查是否已在分析中
                logger.info(f"[博弈论] 会话 {session_id} 已有意图分析在进行中,跳过重复触发")
                return

            # 标记分析开始
            self.analysis_in_progress[session_id] = True

            import asyncio
            from system.background_analyzer import get_background_analyzer
            from system.config import config
            background_analyzer = get_background_analyzer()

            # 根据配置获取意图分析轮数,默认3轮
            intent_rounds = getattr(config.api, 'intent_analysis_rounds', 3)
            max_messages = intent_rounds * 2  # 每轮包含用户和助手各一条消息

            recent_messages = self.get_recent_messages(session_id, count=max_messages)
            logger.info(f"[博弈论] 分析最近 {intent_rounds} 轮对话,共 {len(recent_messages)} 条消息")

            # 异步执行分析任务
            async def _execute_analysis():
                try:
                    await background_analyzer.analyze_intent_async(recent_messages, session_id)
                finally:
                    # 无论成功与否,都清除分析状态
                    self.analysis_in_progress[session_id] = False
                    logger.info(f"[博弈论] 会话 {session_id} 意图分析完成,状态已清除")

            asyncio.create_task(_execute_analysis())
        except Exception as e:
            # 发生异常时也要清除分析状态
            self.analysis_in_progress[session_id] = False
            logger.error(f"后台意图分析触发失败: {e}")

    def get_all_sessions_api(self):
        """获取所有会话信息 - API接口"""
        try:
            # 清理过期会话
            self.cleanup_old_sessions()

            # 获取所有会话信息
            sessions_info = self.get_all_sessions_info()

            return {
                "status": "success",  # 状态
                "sessions": sessions_info,  # 会话信息
                "total_sessions": len(sessions_info)  # 会话总数
            }
        except Exception as e:
            logger.error(f"获取会话信息错误: {e}")
            raise Exception(f"获取会话信息失败: {str(e)}")

    def get_session_detail_api(self, session_id: str):
        """获取指定会话的详细信息 - API接口"""
        try:
            session_info = self.get_session_info(session_id)
            if not session_info:
                raise Exception("会话不存在")

            return {
                "status": "success",
                "session_id": session_id,
                "session_info": session_info,
                "messages": self.get_messages(session_id),
                "conversation_rounds": session_info["conversation_rounds"]
            }
        except Exception as e:
            logger.error(f"获取会话详情错误: {e}")
            raise Exception(f"获取会话详情失败: {str(e)}")

    def delete_session_api(self, session_id: str):
        """删除指定会话 - API接口"""
        try:
            success = self.delete_session(session_id)
            if success:
                return {
                    "status": "success",
                    "message": f"会话 {session_id} 已删除"
                }
            else:
                raise Exception("会话不存在")
        except Exception as e:
            logger.error(f"删除会话错误: {e}")
            raise Exception(f"删除会话失败: {str(e)}")

    def clear_all_sessions_api(self):
        """清空所有会话 - API接口"""
        try:
            count = self.clear_all_sessions()
            return {
                "status": "success",
                "message": f"已清空 {count} 个会话"
            }
        except Exception as e:
            logger.error(f"清空会话错误: {e}")
            raise Exception(f"清空会话失败: {str(e)}")


# 全局消息管理器实例
message_manager = MessageManager()

# =====MessageManager功能地图======
'''
MessageManager
├── 初始化配置
├── 会话管理
│   ├── create_session() - 创建会话
│   ├── get_session() - 获取会话
│   ├── delete_session() - 删除会话
│   ├── clear_all_sessions() - 清空所有
│   └── cleanup_old_sessions() - 清理过期
├── 消息管理
│   ├── add_message() - 添加消息
│   ├── get_messages() - 获取所有消息
│   ├── get_recent_messages() - 获取最近消息
│   ├── build_conversation_messages() - 构建对话包
│   └── build_conversation_messages_from_memory() - 从内存构建
├── 日志管理
│   ├── parse_log_file() - 解析日志文件
│   ├── load_recent_context() - 加载历史上下文
│   ├── save_conversation_log() - 保存日志
│   └── save_conversation_and_logs() - 统一保存
├── 统计分析
│   ├── get_session_info() - 会话信息
│   ├── get_all_sessions_info() - 所有会话信息
│   ├── get_context_statistics() - 上下文统计
│   └── get_agent_type()/set_agent_type() - 代理类型
├── 后台处理
│   └── trigger_background_analysis() - 触发后台分析
├── API接口
│   ├── get_all_sessions_api() - 所有会话API
│   ├── get_session_detail_api() - 会话详情API
│   ├── delete_session_api() - 删除会话API
│   └── clear_all_sessions_api() - 清空所有API
└── 辅助方法(以_开头)
    ├── _parse_log_line() - 解析日志行
    ├── _is_message_start_line() - 判断消息开始
    └── _parse_conversation_block() - 解析对话块
'''

# ========常见的编程范式/模式总结=========
# 1. 初始化-处理-清理模式
# 这个模式在代码中反复出现:
'''
def 某个函数():
    # 1. 初始化阶段
    结果容器 = []  # 或 {}
    临时变量 = None

    # 2. 处理阶段
    for 每个项目 in 数据源:
        # 处理逻辑
        if 条件:
            结果容器.append(处理结果)

    # 3. 清理/返回阶段
    return 结果容器

在代码中的体现:
  parse_log_file():初始化messages列表,逐块处理,返回结果
  load_recent_context():初始化all_messages,逐个文件处理,返回结果
  _parse_conversation_block():初始化各种变量,逐行处理,返回结果

为什么这样设计?
  清晰:三个阶段分明
  可维护:每个阶段职责明确
  可测试:可以单独测试每个阶段
'''
# 2. 标记-处理-清除模式(状态管理)
'''
def 有状态处理的函数():
    # 1. 标记开始
    标记字典[键] = True

    try:
        # 2. 核心处理
        复杂操作()
    finally:
        # 3. 清除标记(无论如何都执行)
        标记字典[键] = False

在代码中的体现:
def trigger_background_analysis(self, session_id: str):
    # 标记开始
    if self.analysis_in_progress.get(session_id):
        return  # 防止重复
    self.analysis_in_progress[session_id] = True

    try:
        # 核心处理
        await background_analyzer.analyze_intent_async(...)
    finally:
        # 清除标记
        self.analysis_in_progress[session_id] = False

为什么这样设计?
  防重入:防止同一任务被重复执行
  资源安全:确保标记被正确清理
  异常安全:即使出错也能清理状态
'''
# 3. 配置-默认值模式
'''
def 某个需要配置的函数():
    try:
        # 1. 尝试获取外部配置
        from 配置模块 import 配置
        参数 = 配置.某个值
    except ImportError:
        # 2. 失败时使用默认值
        参数 = 默认值

在代码中的体现:
def __init__(self):
    try:
        from system.config import config
        self.max_history_rounds = config.api.max_history_rounds
    except ImportError:
        self.max_history_rounds = 10  # 默认值

为什么这样设计?
  灵活性:可以通过配置文件调整行为
  鲁棒性:配置缺失时程序仍能运行
  可部署性:不同环境可以用不同配置
'''
# 4. 包装器模式(API封装)
'''
def 原始函数(参数):
    # 原始逻辑
    return 结果

def 包装后的API函数(参数):
    try:
        # 调用原始函数
        结果 = 原始函数(参数)

        # 包装成标准格式
        return {
            "status": "success",
            "data": 结果
        }
    except Exception as e:
        # 统一错误处理
        logger.error(...)
        return {
            "status": "error",
            "message": str(e)
        }

在代码中的体现:
def delete_session(self, session_id: str) -> bool:
    # 原始逻辑
    if session_id in self.sessions:
        del self.sessions[session_id]
        return True
    return False

def delete_session_api(self, session_id: str):
    # 包装后的API
    try:
        success = self.delete_session(session_id)
        if success:
            return {"status": "success", "message": f"会话已删除"}
        else:
            raise Exception("会话不存在")
    except Exception as e:
        logger.error(...)
        raise Exception(...)

为什么这样设计?
  接口统一:所有API返回相同结构
  错误处理集中:统一记录日志和错误格式
  业务逻辑分离:核心逻辑和接口逻辑分离
'''
# 5. 遍历-过滤-收集模式
'''
def 处理集合(数据集合):
    结果 = []

    for 项目 in 数据集合:
        # 过滤条件
        if not 满足条件(项目):
            continue  # 跳过

        # 处理逻辑
        处理后的项目 = 处理函数(项目)

        # 收集结果
        结果.append(处理后的项目)

    return 结果

在代码中的体现:
def cleanup_old_sessions(self, max_age_hours: int = 24):
    expired_sessions = []  # 收集结果

    for session_id, session in self.sessions.items():  # 遍历
        # 过滤条件
        if current_time - session["last_activity"] > max_age_hours * 3600:
            # 收集符合条件的
            expired_sessions.append(session_id)

    # 后续处理收集到的结果
    for session_id in expired_sessions:
        del self.sessions[session_id]

    return len(expired_sessions)
'''
# 6. 状态机模式
'''
def 状态机处理函数(输入数据):
    当前状态 = None
    当前数据 = []

    for 行 in 输入数据:
        if 是状态开始标记(行):
            # 保存之前的状态数据
            if 当前状态 and 当前数据:
                保存状态数据(当前状态, 当前数据)

            # 进入新状态
            当前状态 = 解析状态(行)
            当前数据 = []
        elif 当前状态 is not None:
            # 在当前状态下收集数据
            当前数据.append(行)

    # 处理最后的状态
    if 当前状态 and 当前数据:
        保存状态数据(当前状态, 当前数据)

在代码中的体现:
def _parse_conversation_block(self, block: str):
    current_message = None  # 当前状态
    current_content_lines = []  # 当前数据

    for line in lines:
        if self._is_message_start_line(line):  # 状态切换
            # 保存前一个状态
            if current_message is not None and current_content_lines:
                保存消息()

            # 进入新状态
            current_message = {"role": role}
            current_content_lines = []
        elif current_message is not None:  # 状态内处理
            current_content_lines.append(line)

    # 处理最后一个状态
    if current_message is not None and current_content_lines:
        保存消息()

为什么这样设计?
  处理复杂结构:适合处理有层次、有状态的数据
  内存高效:一次遍历完成处理
  逻辑清晰:状态转换明确
'''
# ========这些模式背后的编程思想=========
'''
1. 分离关注点(Separation of Concerns)
  初始化、处理、清理各司其职
  业务逻辑和错误处理分离
  数据获取和数据使用分离
2. 防御式编程(Defensive Programming)
  总是检查输入有效性
  总是处理异常情况
  总是清理资源
3. 一次做好一件事(Do One Thing Well)
  每个函数只负责一个主要任务
  复杂任务分解为简单步骤
  通过组合小函数完成大功能
4. 不要重复自己(DRY - Don't Repeat Yourself)
  相似的逻辑抽象成函数
  通用的模式形成规范
  配置集中管理
'''
# 如何在代码中识别这些模式?
'''
看变量名:
  result = [] → 收集模式
  current_xxx = None → 状态机模式
  is_xxx_in_progress → 标记模式
看代码结构:
  try-except包裹 → 异常安全模式
  函数开头有配置读取 → 配置模式
  返回标准字典结构 → API包装模式
看注释:
  注释中提到的"状态"、"标记"、"收集"等关键词
'''
# 在其他代码中寻找模式
'''
# 写一个简单的遍历-过滤-收集模式
def get_even_numbers(numbers):
    """获取所有偶数"""
    result = []  # 初始化

    for num in numbers:  # 遍历
        if num % 2 == 0:  # 过滤
            result.append(num)  # 收集

    return result  # 返回

# 写一个简单的配置-默认值模式
def get_timeout():
    try:
        import settings
        return settings.TIMEOUT
    except ImportError:
        return 30  # 默认值
'''

api_server.py

#!/usr/bin/env python3
"""
NagaAgent API服务器
提供RESTful API接口访问NagaAgent功能
"""

import asyncio  # 异步编程工具(让程序可以同时做多件事)
import json    # JSON数据处理(像翻译器,把数据变成计算机能懂的样子)
import sys     # 系统相关功能
import traceback  # 错误追踪(程序出错时告诉我们是哪里错了)
import os      # 操作系统功能(文件、文件夹操作)
import logging  # 日志记录(就像写日记,记录程序做了什么)
import uuid    # 生成唯一ID(给每样东西一个唯一的身份证号)
import time    # 时间相关功能
import threading  # 多线程(让程序可以分身做不同的事)
from contextlib import asynccontextmanager  # 异步上下文管理(资源管理工具)
from typing import Dict, List, Optional, AsyncGenerator, Any  # 类型提示(告诉计算机变量是什么类型)

# 在导入其他模块前先设置HTTP库日志级别,(让程序保持安静)
logging.getLogger("httpcore.http11").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore.connection").setLevel(logging.WARNING)

# 创建logger实例
logger = logging.getLogger(__name__)

# 从nagaagent_core工具包拿工具
from nagaagent_core.api import uvicorn  # 网页服务器引擎
from nagaagent_core.api import FastAPI, HTTPException, Request, UploadFile, File, Form # 创建网站API的工具
from nagaagent_core.api import CORSMiddleware   # 跨域资源共享(让不同网站能互相访问)
from nagaagent_core.api import StreamingResponse    # 流式响应(像水管一样持续传输数据)
from nagaagent_core.api import StaticFiles      # 静态文件服务(图片、CSS等)
from pydantic import BaseModel                # 数据验证工具(检查数据是否符合要求)
from nagaagent_core.core import aiohttp      # 异步HTTP客户端
import shutil                # 文件操作工具
from pathlib import Path     # 路径处理工具

# 添加项目根目录到Python路径(告诉程序去哪里找东西),就像在手机上添加一个新的收藏位置,以后找东西更方便。
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# 流式文本处理模块(仅用于TTS)
from .message_manager import message_manager  # 导入统一的消息管理器

from .llm_service import get_llm_service  # 导入LLM服务

# 导入配置系统
try:
    from system.config import config, AI_NAME  # 使用新的配置系统
    from system.config import get_prompt  # 导入提示词仓库
except ImportError:
    # 如果找不到,再调整一次路径然后导入
    import sys
    import os
    sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    from system.config import config, AI_NAME  # 使用新的配置系统
    from system.config import get_prompt  # 导入提示词仓库
from ui.utils.response_util import extract_message  # 导入消息提取工具

# 对话核心功能已集成到apiserver

#1. 两个“转发”函数
# 统一后台意图分析触发函数 - 已整合到message_manager
def _trigger_background_analysis(session_id: str):
    """统一触发后台意图分析 - 委托给message_manager"""
    message_manager.trigger_background_analysis(session_id)

# 统一保存对话与日志函数 - 已整合到message_manager
def _save_conversation_and_logs(session_id: str, user_message: str, assistant_response: str):
    """统一保存对话历史与日志 - 委托给message_manager"""
    message_manager.save_conversation_and_logs(session_id, user_message, assistant_response)
#这两个函数自己不干活,只是“传话员”。它们把工作交给message_manager去做。
# 回调工厂类已移除 - 功能已整合到streaming_tool_extractor


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    try:
        print("[INFO] 正在初始化API服务器...")
        # 程序启动时要做的事
        # 对话核心功能已集成到apiserver
        print("[SUCCESS] API服务器初始化完成")
        yield
    except Exception as e:
        print(f"[ERROR] API服务器初始化失败: {e}")
        traceback.print_exc()
        sys.exit(1)
    finally:  #关店清理(finally部分)
        print("[INFO] 正在清理资源...")
        # MCP服务现在由mcpserver独立管理,无需清理

# 创建FastAPI应用
app = FastAPI(
    title="NagaAgent API",
    description="智能对话助手API服务",
    version="4.0.0",
    lifespan=lifespan   # 使用上面定义的生命周期管理
)

# 配置CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 生产环境建议限制具体域名
    allow_credentials=True,    # 允许携带凭证
    allow_methods=["*"],       # 允许所有HTTP方法
    allow_headers=["*"],       # 允许所有请求头
)

# 挂载静态文件
static_dir = os.path.join(os.path.dirname(__file__), "static")
app.mount("/static", StaticFiles(directory=static_dir), name="static")

# 请求模型
class ChatRequest(BaseModel):
    message: str                         # 用户发送的消息
    stream: bool = False                 # 是否使用流式传输(默认否)
    session_id: Optional[str] = None     # 会话ID(可选)
    use_self_game: bool = False          # 是否使用自博弈(游戏相关)
    disable_tts: bool = False  # V17: 支持禁用服务器端TTS(语音合成)
    return_audio: bool = False  # V19: 支持返回音频URL供客户端播放
    skip_intent_analysis: bool = False  # 新增:跳过意图分析

class ChatResponse(BaseModel):
    response: str
    session_id: Optional[str] = None
    status: str = "success"



class SystemInfoResponse(BaseModel):   #系统信息
    version: str
    status: str
    available_services: List[str]
    api_key_configured: bool

class FileUploadResponse(BaseModel):  #文件上传响应
    filename: str
    file_path: str
    file_size: int
    file_type: str
    upload_time: str
    status: str = "success"
    message: str = "文件上传成功"

class DocumentProcessRequest(BaseModel):  #文档处理请求
    file_path: str
    action: str = "read"  # read, analyze, summarize
    session_id: Optional[str] = None


# API路由
@app.get("/", response_model=Dict[str, str])  #访问根路径时返回基本信息。
async def root():
    """API根路径"""
    return {
        "name": "NagaAgent API",
        "version": "4.0.0",
        "status": "running",
        "docs": "/docs",
    }

@app.get("/health")
async def health_check():
    """健康检查"""
    return {
        "status": "healthy",
        "agent_ready": True,
        "timestamp": str(asyncio.get_event_loop().time())
    }

@app.get("/system/info", response_model=SystemInfoResponse)
async def get_system_info():
    """获取系统信息"""
    
    return SystemInfoResponse(
        version="4.0.0",
        status="running",
        available_services=[],  # MCP服务现在由mcpserver独立管理
        api_key_configured=bool(config.api.api_key and config.api.api_key != "sk-placeholder-key-not-set")
    )

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """普通对话接口 - 仅处理纯文本对话"""
'''
1. 接口定义 - 就像开了一个聊天窗口
app.post("/chat"):创建一个POST请求的接口,地址是/chat
就像在网站上开了一个聊天窗口,用户可以通过这个窗口发送消息
response_model=ChatResponse:告诉程序返回的数据要按照ChatResponse的格式来
async def chat:这是一个异步函数,可以同时处理多个用户的聊天
'''

    #安全检查 - 检查用户有没有发送空消息
    if not request.message.strip():   #request.message.strip():获取用户发送的消息,并去掉两边的空格
        #raise HTTPException:就抛出一个错误,告诉用户"消息内容不能为空"
        raise HTTPException(status_code=400, detail="消息内容不能为空")  #status_code=400:400是HTTP状态码,表示"错误的请求"
    
    try:
        # 分支: 启用博弈论流程(非流式,返回聚合文本)
        if request.use_self_game:  #if request.use_self_game::检查用户是否想使用"博弈论"模式
            # 配置项控制:失败时可跳过回退到普通对话
            # #从配置中读取两个设置:1.skip_on_error:博弈论失败时是否跳过(默认是)2.enabled:是否启用博弈论(默认否)
            #getattr是Python函数,安全地获取对象的属性,如果属性不存在就返回默认值
            skip_on_error = getattr(getattr(config, 'game', None), 'skip_on_error', True)  # 兼容无配置情况 #
            enabled = getattr(getattr(config, 'game', None), 'enabled', False)  # 控制总开关 #
            if enabled:
                try:
                    # 延迟导入以避免启动时循环依赖 #
                    from game.naga_game_system import NagaGameSystem  # 博弈系统入口 #
                    from game.core.models.config import GameConfig  # 博弈系统配置 #
                    # 创建系统并执行用户问题处理 #
                    system = NagaGameSystem(GameConfig())
                    system_response = await system.process_user_question(
                        user_question=request.message,
                        user_id=request.session_id or "api_user"
                    )
                    return ChatResponse(
                        response=system_response.content,
                        session_id=request.session_id,
                        status="success"
                    )
                except Exception as e:
                    print(f"[WARNING] 博弈论流程失败,将{ '回退到普通对话' if skip_on_error else '返回错误' }: {e}")  # 运行时警告 #
                    if not skip_on_error:
                        raise HTTPException(status_code=500, detail=f"博弈论流程失败: {str(e)}")
                    # 否则继续走普通对话流程 #
            # 若未启用或被配置跳过,则直接回退到普通对话分支 #

        # 获取或创建会话ID
        session_id = message_manager.create_session(request.session_id)
        
        # 构建系统提示词(只使用对话风格提示词)
        #get_prompt("conversation_style_prompt"):从提示词仓库获取"对话风格提示词"
        system_prompt = get_prompt("conversation_style_prompt")
        
        # 使用消息管理器构建完整的对话消息(纯聊天,不触发工具)
        messages = message_manager.build_conversation_messages(
            session_id=session_id,
            system_prompt=system_prompt,
            current_message=request.message
        )
        
        # 使用整合后的LLM服务
        llm_service = get_llm_service()
        response_text = await llm_service.chat_with_context(messages, config.api.temperature)
        
        # 处理完成
        # 统一保存对话历史与日志
        _save_conversation_and_logs(session_id, request.message, response_text)
        #调用之前定义的_save_conversation_and_logs函数
        #保存用户消息和AI回复到对话历史中
        #就像把这次聊天记录到聊天记录本里

        # 在用户消息保存到历史后触发后台意图分析(除非明确跳过)
        #这是后台悄悄进行的,不会影响当前回复
        if not request.skip_intent_analysis:
            _trigger_background_analysis(session_id=session_id)

        return ChatResponse(   #返回响应给用户
            response=extract_message(response_text) if response_text else response_text,
            session_id=session_id,
            status="success"
        )
        '''
        理解:
        把AI的回复包装成ChatResponse格式返回
        extract_message(response_text):可能对回复做最后的处理(比如提取关键部分)
        包含会话ID(这样用户下次可以继续这个对话)
        状态设置为"success"(成功)
        '''
    #错误处理 - 如果一切出错了怎么办
    except Exception as e:
        print(f"对话处理错误: {e}")
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
    '''
    如果上面的代码有任何错误,就执行这里
    print(f"对话处理错误: {e}"):在控制台打印错误信息
    traceback.print_exc():打印详细的错误堆栈(知道是哪一行出错了)
    raise HTTPException:告诉用户"处理失败",并显示具体错误
    '''
# =========流程图总结这个函数的工作流程:============
'''
开始
  ↓
检查消息是否为空 → 如果是 → 返回错误
  ↓
检查是否使用博弈论模式 → 如果是 → 尝试博弈论处理
  ↓                                     ↓
  ↓                     成功? → 是 → 返回博弈论结果
  ↓                                     ↓
  ↓                     否 → 配置跳过? → 否 → 返回错误
  ↓                                     ↓
  ↓                                    是
  ↓                                     ↓
进入普通对话模式
  ↓
获取/创建会话ID
  ↓
获取对话风格提示词
  ↓
构建完整对话消息(历史+新消息)
  ↓
调用AI模型生成回复
  ↓
保存对话历史
  ↓
(可选)触发后台意图分析
  ↓
返回成功响应
  ↓
如果有任何错误 → 打印错误并返回错误信息
'''

# ======关键函数总结:==========
'''
1.消息管理器相关:
    message_manager.create_session() - 创建会话
    message_manager.build_conversation_messages() - 构建对话消息
2.配置相关:
   get_prompt() - 获取提示词
   config.api.temperature - 获取温度参数
3.AI服务相关:
  get_llm_service() - 获取AI服务
  llm_service.chat_with_context() - 与AI对话
4.工具函数:
  extract_message() - 提取消息(可能来自另一个模块)
  _save_conversation_and_logs() - 保存对话(调用消息管理器)
  _trigger_background_analysis() - 触发意图分析(调用消息管理器)
'''

#流式聊天接口(/chat/stream),这段代码比之前的更复杂一些,因为它涉及到流式传输(像水管一样一点点输送数据)和语音处理。
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    """流式对话接口 - 流式文本处理交给streaming_tool_extractor用于TTS"""
'''
理解:
  创建一个新的POST接口,地址是/chat/stream
  专门处理流式对话(文字一个字一个字地显示,而不是一次性显示完整回复)
  还会处理语音合成(TTS - Text To Speech,文字转语音)
比喻:这就像两个水管:
  普通聊天接口:一桶水一次性倒给你
  流式聊天接口:水管一点点流水给你
'''

    #2. 检查空消息,和之前一样,先检查用户是不是发了空消息。
    if not request.message.strip():
        raise HTTPException(status_code=400, detail="消息内容不能为空")

    #3. 核心:创建流式响应生成器
    '''
    重要概念:
       generate_response是一个生成器函数(AsyncGenerator)
       它不会一次性返回所有数据,而是用yield一点点产生数据
       complete_text:用来累积AI的所有回复文字,后面可能用来生成语音
    '''
    async def generate_response() -> AsyncGenerator[str, None]:
        complete_text = ""  # V19: 用于累积完整文本以生成音频
        try:
            # 获取或创建会话ID,主要逻辑开始
            session_id = message_manager.create_session(request.session_id)
            
            # 发送会话ID信息
            yield f"data: session_id: {session_id}\n\n"
            
            # 注意:这里不触发后台分析,将在对话保存后触发
            '''
            理解:
              创建会话ID(和之前一样)
              yield f"data: session_id: {session_id}\n\n":这是第一次发送数据给客户端
              格式是data: session_id: xxx,这是一种特殊的SSE(Server-Sent Events)格式
            比喻:就像你先告诉用户:"我们的聊天室编号是123,然后我们开始聊天"
            '''

            #构建对话消息,和之前的普通聊天接口一样,准备对话内容。
            # 构建系统提示词(只使用对话风格提示词)
            system_prompt = get_prompt("conversation_style_prompt")
            
            # 使用消息管理器构建完整的对话消息
            messages = message_manager.build_conversation_messages(
                session_id=session_id,
                system_prompt=system_prompt,
                current_message=request.message
            )

            # 语音集成初始化(重要部分!),这部分比较复杂,因为有很多模式和配置:
            '''
            理解4个条件(都要满足才启用实时TTS):
               系统配置启用了语音功能
               用户没有要求返回音频URL(return_audio=False)
               语音模式不是"混合模式"
               用户没有明确禁用TTS
            '''
            # 初始化语音集成(根据voice_mode和return_audio决定)
            # V19: 如果客户端请求返回音频,则在服务器端生成
            voice_integration = None

            # V19: 混合模式下,如果请求return_audio,则在服务器生成音频
            # 修复双音频问题:return_audio时不启用实时TTS,只在最后生成完整音频
            should_enable_tts = (
                config.system.voice_enabled
                and not request.return_audio  # 修复:return_audio时不启用实时TTS
                and config.voice_realtime.voice_mode != "hybrid"
                and not request.disable_tts  # 兼容旧版本的disable_tts
            )

            if should_enable_tts:
                try:
                    from voice.output.voice_integration import get_voice_integration
                    voice_integration = get_voice_integration()
                    logger.info(f"[API Server] 实时语音集成已启用 (return_audio={request.return_audio}, voice_mode={config.voice_realtime.voice_mode})")
                except Exception as e:
                    print(f"语音集成初始化失败: {e}")
            else:   #如果条件满足,就初始化语音集成(让AI说话的功能)
                if request.return_audio:
                    logger.info("[API Server] return_audio模式,将在最后生成完整音频")
                elif config.voice_realtime.voice_mode == "hybrid" and not request.return_audio:
                    logger.info("[API Server] 混合模式下且未请求音频,不处理TTS")
                elif request.disable_tts:
                    logger.info("[API Server] 客户端禁用了TTS (disable_tts=True)")
                    '''
                    理解不同的情况:
                      return_audio=True:用户想要音频文件,最后生成
                      混合模式:可能客户端自己处理语音
                      disable_tts=True:用户明确不要语音
                    '''

            # 初始化流式文本切割器(仅用于TTS处理)
            # 始终创建tool_extractor以累积文本内容,确保日志保存
            tool_extractor = None
            try:
                from .streaming_tool_extractor import StreamingToolCallExtractor
                tool_extractor = StreamingToolCallExtractor()
                # 只有在需要实时TTS且不是return_audio模式时,才设置voice_integration
                if voice_integration and not request.return_audio:
                    tool_extractor.set_callbacks(
                        on_text_chunk=None,  # 不需要回调,直接处理TTS
                        voice_integration=voice_integration
                    )
            except Exception as e:
                print(f"流式文本切割器初始化失败: {e}")
                '''
                理解:
                  tool_extractor:文本切割器,把连续的文本切成小块
                  如果需要实时TTS,就把语音集成设置给切割器
                  这样切割器收到文字时,可以立即让语音集成说出来
                '''
            
            # 使用整合后的流式处理,流式调用AI模型(重点!)
            llm_service = get_llm_service()\
            #llm_service.stream_chat_with_context:这是流式调用AI的方法
            #async for chunk in ...:一点点获取AI的回复,每次一小块(chunk)
            async for chunk in llm_service.stream_chat_with_context(messages, config.api.temperature):
                # V19: 如果需要返回音频,累积文本
                if request.return_audio and chunk.startswith("data: "):
                    try:
                        import base64
                        data_str = chunk[6:].strip()
                        if data_str != '[DONE]':
                            decoded = base64.b64decode(data_str).decode('utf-8')
                            complete_text += decoded
                    except Exception:
                        pass
                    '''
                    理解:
                       如果用户要求返回音频,就累积完整的文本
                       chunk格式是data: BASE64编码的文字
                       需要解码BASE64,然后拼接到complete_text中
                    '''
                
                # 立即发送到流式文本切割器进行TTS处理(不阻塞文本流)
                if tool_extractor and chunk.startswith("data: "):
                    try:
                        import base64
                        data_str = chunk[6:].strip()
                        if data_str != '[DONE]':
                            decoded = base64.b64decode(data_str).decode('utf-8')
                            # 同步处理文本累积,不阻塞文本流
                            tool_extractor.complete_text += decoded
                    except Exception as e:
                        logger.error(f"[API Server] 流式文本切割器处理错误: {e}")
                        #同时,把文字也传给文本切割器(用于实时TTS),注意:这是并行处理的,不影响文字流给用户
                
                yield chunk
                #最重要的部分:立即把这一小块文字发送给用户!,比喻:AI思考一个字 → 发送给用户显示 → 同时传给语音系统准备说话,就像一个人一边说话(文字显示),一边发声(语音合成)
            
            # 处理完成
            #音频生成(如果用户要求音频文件)
            # V19: 如果请求返回音频,在这里生成并返回音频URL
            if request.return_audio and complete_text:
                try:
                    logger.info(f"[API Server V19] 生成音频,文本长度: {len(complete_text)}")

                    # 使用服务器端的TTS生成音频
                    from voice.tts_wrapper import generate_speech_safe
                    import tempfile
                    import uuid

                    # 生成音频文件
                    tts_voice = config.voice_realtime.tts_voice or "zh-CN-XiaoyiNeural"
                    audio_file = generate_speech_safe(
                        text=complete_text,
                        voice=tts_voice,
                        response_format="mp3",
                        speed=1.0
                    )
                    '''
                    理解:
                      文字全部接收完后,用TTS生成整个音频文件
                      generate_speech_safe:安全生成语音的函数
                      选择语音(默认是"晓易"中文语音)
                    '''

                    # 直接使用voice/output播放音频,不再返回给客户端
                    try:
                        from voice.output.voice_integration import get_voice_integration
                        voice_integration = get_voice_integration()
                        voice_integration.receive_audio_url(audio_file)
                        logger.info(f"[API Server V19] 音频已直接播放: {audio_file}")
                    except Exception as e:
                        logger.error(f"[API Server V19] 音频播放失败: {e}")
                        # 如果播放失败,仍然返回给客户端作为备选
                        yield f"data: audio_url: {audio_file}\n\n"

                except Exception as e:
                    logger.error(f"[API Server V19] 音频生成失败: {e}")
                    # traceback已经在文件顶部导入,直接使用
                    print(f"[API Server V19] 详细错误信息:")
                    traceback.print_exc()

            #完成处理
            # 完成流式文本切割器处理(非return_audio模式,不阻塞)
            if tool_extractor and not request.return_audio:
                try:
                    # 同步处理完成,不阻塞文本流返回
                    # tool_extractor.finish_processing() 是异步方法,这里不需要调用
                    pass
                except Exception as e:
                    print(f"流式文本切割器完成处理错误: {e}")
            
            # 完成语音处理
            if voice_integration and not request.return_audio:  # V19: return_audio模式不需要这里的处理
                try:
                    threading.Thread(
                        target=voice_integration.finish_processing,
                        daemon=True
                    ).start()
                except Exception as e:
                    print(f"语音集成完成处理错误: {e}")
                    #理解:告诉文本切割器和语音集成:"处理完了,可以做收尾工作了",threading.Thread(...).start():在新线程中处理,不阻塞主程序

            # 流式处理完成后,获取完整文本用于保存
            complete_response = ""
            if tool_extractor:
                try:
                    # 获取完整文本内容
                    complete_response = tool_extractor.get_complete_text()
                except Exception as e:
                    print(f"获取完整响应文本失败: {e}")
            elif request.return_audio:
                # V19: 如果是return_audio模式,使用累积的文本
                complete_response = complete_text
            
            # 统一保存对话历史与日志
            _save_conversation_and_logs(session_id, request.message, complete_response)

            # 在用户消息保存到历史后触发后台意图分析(除非明确跳过)
            if not request.skip_intent_analysis:
                _trigger_background_analysis(session_id)

            yield "data: [DONE]\n\n"
            '''
            理解:获取完整的AI回复文字,保存到对话历史,触发意图分析,发送结束标记[DONE],告诉客户端:"我说完了"
            '''

        # 错误处理
        except Exception as e:
            print(f"流式对话处理错误: {e}")
            # 使用顶部导入的traceback
            traceback.print_exc()
            yield f"data: 错误: {str(e)}\n\n"

    #返回流式响应
    return StreamingResponse(
        generate_response(),
        media_type="text/event-stream",  #这是服务器推送事件的格式
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Content-Type": "text/event-stream",
            "Access-Control-Allow-Origin": "*",
            "Access-Control-Allow-Headers": "*",
            "X-Accel-Buffering": "no"  # 禁用nginx缓冲,告诉nginx不要缓存,立即传输
        }
    )
# =========流程图总结:=============
'''
开始
  ↓
检查空消息
  ↓
创建会话ID → 立即发送给客户端
  ↓
判断语音模式:
  - 实时TTS模式?
  - 返回音频模式?
  - 混合模式?
  - 禁用TTS?
  ↓
初始化文本切割器
  ↓
开始流式获取AI回复:
  ↓
AI产生一个字 → 立即发送给客户端显示
  ↓              ↓
如果需要音频 → 累积文字     如果需要实时TTS → 传给语音系统
  ↓              ↓
继续下一个字 ←---------------
  ↓
AI回复完成
  ↓
如果是音频模式 → 生成音频文件 → 播放或返回URL
  ↓
完成文本切割器和语音处理
  ↓
保存对话历史
  ↓
触发意图分析
  ↓
发送结束标记
  ↓
如果有错误 → 发送错误信息
'''

# ==========关键函数和调用:=========
'''
1.消息处理:
  message_manager.create_session() - 创建会话
  message_manager.build_conversation_messages() - 构建消息
2.AI服务:
  get_llm_service() - 获取AI服务
  llm_service.stream_chat_with_context() - 流式聊天核心
3.语音处理:
 get_voice_integration() - 获取语音集成
 generate_speech_safe() - 生成语音文件
 voice_integration.receive_audio_url() - 播放音频
4.文本处理:
 StreamingToolCallExtractor() - 文本切割器
 tool_extractor.set_callbacks() - 设置回调
5.数据保存:
 _save_conversation_and_logs() - 保存对话
 _trigger_background_analysis() - 触发分析
'''

# 获取记忆统计接口 (/memory/stats),@app.get("/memory/stats"):创建一个GET请求的接口,地址是/memory/stats,用来获取AI的"记忆系统"的统计信息
@app.get("/memory/stats")
async def get_memory_stats():
    """获取记忆统计信息"""
    
    try:
        # 记忆系统现在由main.py直接管理
        try:
            #第一层:尝试导入记忆管理器
            # 分层理解:第一层:尝试导入记忆管理器:from summer_memory.memory_manager import memory_manager
            # 尝试从summer_memory模块导入memory_manager,如果导入失败,说明记忆系统模块不存在
            from summer_memory.memory_manager import memory_manager
            #第二层:检查记忆系统是否启用
            #如果记忆管理器存在且启用(enabled=True),调用get_memory_stats()方法获取统计信息,返回成功状态和统计信息
            if memory_manager and memory_manager.enabled:
                stats = memory_manager.get_memory_stats()
                return {
                    "status": "success",
                    "memory_stats": stats
                }
            else:
                return {
                    "status": "success",
                    "memory_stats": {"enabled": False, "message": "记忆系统未启用"}
                }
        #第三层:如果模块不存在,如果summer_memory模块不存在(ImportError),返回模块未找到的提示
        except ImportError:
            return {
                "status": "success",
                "memory_stats": {"enabled": False, "message": "记忆系统模块未找到"}
            }
    #处理其他所有错误
    except Exception as e:
        print(f"获取记忆统计错误: {e}")
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=f"获取记忆统计失败: {str(e)}")
    #这个接口就像问AI:"你还记得多少事情?",如果AI有记忆系统:返回记忆统计,如果记忆系统没开:告诉用户没开,如果记忆系统根本没装:告诉用户没安装

# 获取所有会话信息 (/sessions)
'''
理解:
  GET请求,获取所有聊天会话的信息
  完全委托给message_manager.get_all_sessions_api()处理
  如果出错,返回500错误
'''
@app.get("/sessions")
async def get_sessions():
    """获取所有会话信息 - 委托给message_manager"""
    try:
        return message_manager.get_all_sessions_api()
    except Exception as e:
        print(f"获取会话信息错误: {e}")
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=str(e))

#{session_id}:这是路径参数,可以从URL中获取,例如:访问/sessions/abc123,session_id就是"abc123"
@app.get("/sessions/{session_id}")
async def get_session_detail(session_id: str):
    """获取指定会话的详细信息 - 委托给message_manager"""
    try:
        return message_manager.get_session_detail_api(session_id)
    except Exception as e:
        if "会话不存在" in str(e):   #检查错误信息是否包含"会话不存在"
            raise HTTPException(status_code=404, detail=str(e))
        print(f"获取会话详情错误: {e}")
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=str(e))

#@app.delete:创建一个DELETE请求的接口
@app.delete("/sessions/{session_id}")
async def delete_session(session_id: str):
    """删除指定会话 - 委托给message_manager"""
    try:
        return message_manager.delete_session_api(session_id)
    except Exception as e:
        if "会话不存在" in str(e):
            raise HTTPException(status_code=404, detail=str(e))
        print(f"删除会话错误: {e}")
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=str(e))

#DELETE请求,但没有路径参数,删除所有会话(危险操作!),注意:这个接口没有404错误的处理,因为清空所有会话不会出现"会话不存在"的情况。
@app.delete("/sessions")
async def clear_all_sessions():
    """清空所有会话 - 委托给message_manager"""
    try:
        return message_manager.clear_all_sessions_api()
    except Exception as e:
        print(f"清空会话错误: {e}")
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=str(e))

# 文档处理功能已整合到 ui/controller/tool_document.py

# 新增:日志解析相关API接口
@app.get("/logs/context/statistics")
async def get_log_context_statistics(days: int = 7):
    """获取日志上下文统计信息"""
    try:
        statistics = message_manager.get_context_statistics(days)
        return {
            "status": "success",
            "statistics": statistics
        }
    except Exception as e:
        print(f"获取日志上下文统计错误: {e}")
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=f"获取统计信息失败: {str(e)}")


@app.get("/logs/context/load")
async def load_log_context(days: int = 3, max_messages: int = None):
    """加载日志上下文"""
    try:
        messages = message_manager.load_recent_context(days=days, max_messages=max_messages)
        return {
            "status": "success",
            "messages": messages,
            "count": len(messages),
            "days": days
        }
    except Exception as e:
        print(f"加载日志上下文错误: {e}")
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=f"加载上下文失败: {str(e)}")

@app.post("/tool_notification")
async def tool_notification(payload: Dict[str, Any]):
    """接收工具调用状态通知,只显示工具调用状态,不显示结果"""
    try:
        session_id = payload.get("session_id")
        tool_calls = payload.get("tool_calls", [])
        message = payload.get("message", "")

        if not session_id:
            raise HTTPException(400, "缺少session_id")

        # 记录工具调用状态(不处理结果,结果由tool_result_callback处理)
        for tool_call in tool_calls:
            tool_name = tool_call.get("tool_name", "未知工具")
            service_name = tool_call.get("service_name", "未知服务")
            status = tool_call.get("status", "starting")
            logger.info(f"工具调用状态: {tool_name} ({service_name}) - {status}")

        # 这里可以添加WebSocket通知UI的逻辑,让UI显示工具调用状态
        # 目前先记录日志,UI可以通过其他方式获取工具调用状态

        return {
            "success": True,
            "message": "工具调用状态通知已接收",
            "tool_calls": tool_calls,
            "display_message": message
        }

    except Exception as e:
        logger.error(f"工具调用通知处理失败: {e}")
        raise HTTPException(500, f"处理失败: {str(e)}")

#非常重要
@app.post("/tool_result_callback")
async def tool_result_callback(payload: Dict[str, Any]):
    """接收MCP工具执行结果回调,让主AI基于原始对话和工具结果重新生成回复"""
    try:
        session_id = payload.get("session_id")
        task_id = payload.get("task_id")
        result = payload.get("result", {})
        success = payload.get("success", False)

        if not session_id:
            raise HTTPException(400, "缺少session_id")

        logger.info(f"[工具回调] 开始处理工具回调,会话: {session_id}, 任务ID: {task_id}")
        logger.info(f"[工具回调] 回调内容: {result}")

        # 获取工具执行结果
        tool_result = result.get('result', '执行成功') if success else result.get('error', '未知错误')
        logger.info(f"[工具回调] 工具执行结果: {tool_result}")

        # 获取原始对话的最后一条用户消息(触发工具调用的消息)
        session_messages = message_manager.get_messages(session_id)
        original_user_message = ""
        for msg in reversed(session_messages):
            if msg.get('role') == 'user':
                original_user_message = msg.get('content', '')
                break

        # 构建包含工具结果的用户消息
        enhanced_message = f"{original_user_message}\n\n[工具执行结果]: {tool_result}"
        logger.info(f"[工具回调] 构建增强消息: {enhanced_message[:200]}...")

        # 构建对话风格提示词和消息
        system_prompt = get_prompt("conversation_style_prompt")
        messages = message_manager.build_conversation_messages(
            session_id=session_id,
            system_prompt=system_prompt,
            current_message=enhanced_message
        )

        logger.info(f"[工具回调] 开始生成工具后回复...")

        # 使用LLM服务基于原始对话和工具结果重新生成回复
        try:
            llm_service = get_llm_service()
            response_text = await llm_service.chat_with_context(messages, temperature=0.7)
            logger.info(f"[工具回调] 工具后回复生成成功,内容: {response_text[:200]}...")
        except Exception as e:
            logger.error(f"[工具回调] 调用LLM服务失败: {e}")
            response_text = f"处理工具结果时出错: {str(e)}"

        # 只保存AI回复到历史记录(用户消息已在正常对话流程中保存)
        message_manager.add_message(session_id, "assistant", response_text)
        logger.info(f"[工具回调] AI回复已保存到历史")

        # 保存对话日志到文件
        message_manager.save_conversation_log(original_user_message, response_text, dev_mode=False)
        logger.info(f"[工具回调] 对话日志已保存")

        # 通过UI通知接口将AI回复发送给UI
        logger.info(f"[工具回调] 开始发送AI回复到UI...")
        await _notify_ui_refresh(session_id, response_text)

        logger.info(f"[工具回调] 工具结果处理完成,回复已发送到UI")

        return {
            "success": True,
            "message": "工具结果已通过主AI处理并返回给UI",
            "response": response_text,
            "task_id": task_id,
            "session_id": session_id
        }

    except Exception as e:
        logger.error(f"[工具回调] 工具结果回调处理失败: {e}")
        raise HTTPException(500, f"处理失败: {str(e)}")
#完整流程总结:
'''
用户提问 → AI发现需要工具 → 调用工具 → 工具执行
                                      ↓
工具完成 → 调用/tool_result_callback → AI重新生成回复 → 显示给用户
'''

@app.post("/tool_result")
async def tool_result(payload: Dict[str, Any]):
    """接收工具执行结果并显示在UI上"""
    try:
        session_id = payload.get("session_id")
        result = payload.get("result", "")
        notification_type = payload.get("type", "")
        ai_response = payload.get("ai_response", "")

        if not session_id:
            raise HTTPException(400, "缺少session_id")

        logger.info(f"工具执行结果: {result}")

        # 如果是工具完成后的AI回复,通过信号机制通知UI线程显示
        if notification_type == "tool_completed_with_ai_response" and ai_response:
            try:
                # 使用Qt信号机制在主线程中安全地更新UI
                from ui.controller.tool_chat import chat

                # 直接发射信号,确保在主线程中执行
                chat.tool_ai_response_received.emit(ai_response)
                logger.info(f"[UI] 已通过信号机制通知UI显示AI回复,长度: {len(ai_response)}")
            except Exception as e:
                logger.error(f"[UI] 调用UI控制器显示AI回复失败: {e}")
'''
重要概念:Qt信号机制:
  Qt是一个图形界面框架
  emit()是发送信号的函数
  这是在多线程环境下安全更新UI的方式
  
比喻:就像在两个房间之间用对讲机通话:
  后台线程:"我这里有AI回复了"
  前台UI线程:"收到,我马上显示"
'''

        return {
            "success": True,
            "message": "工具结果已接收",
            "result": result,
            "session_id": session_id
        }

    except Exception as e:
        logger.error(f"处理工具结果失败: {e}")
        raise HTTPException(500, f"处理失败: {str(e)}")


@app.post("/save_tool_conversation")
async def save_tool_conversation(payload: Dict[str, Any]):
    """保存工具对话历史"""
    try:
        session_id = payload.get("session_id")
        user_message = payload.get("user_message", "")
        assistant_response = payload.get("assistant_response", "")

        if not session_id:
            raise HTTPException(400, "缺少session_id")

        logger.info(f"[保存工具对话] 开始保存工具对话历史,会话: {session_id}")

        # 保存用户消息(工具执行结果)
        if user_message:
            message_manager.add_message(session_id, "user", user_message)

        # 保存AI回复
        if assistant_response:
            message_manager.add_message(session_id, "assistant", assistant_response)

        logger.info(f"[保存工具对话] 工具对话历史已保存,会话: {session_id}")

        return {
            "success": True,
            "message": "工具对话历史已保存",
            "session_id": session_id
        }

    except Exception as e:
        logger.error(f"[保存工具对话] 保存工具对话历史失败: {e}")
        raise HTTPException(500, f"保存失败: {str(e)}")

#这是一个通用的UI控制接口。
@app.post("/ui_notification")
async def ui_notification(payload: Dict[str, Any]):
    """UI通知接口 - 用于直接控制UI显示"""
    try:
        session_id = payload.get("session_id")
        action = payload.get("action", "")
        ai_response = payload.get("ai_response", "")

        if not session_id:
            raise HTTPException(400, "缺少session_id")

        logger.info(f"UI通知: {action}, 会话: {session_id}")

        # 处理显示工具AI回复的动作
        if action == "show_tool_ai_response" and ai_response:
            try:
                from ui.controller.tool_chat import chat

                # 直接发射信号,确保在主线程中执行
                chat.tool_ai_response_received.emit(ai_response)
                logger.info(f"[UI通知] 已通过信号机制显示工具AI回复,长度: {len(ai_response)}")
                return {
                    "success": True,
                    "message": "AI回复已显示"
                }
            except Exception as e:
                logger.error(f"[UI通知] 显示工具AI回复失败: {e}")
                raise HTTPException(500, f"显示AI回复失败: {str(e)}")

        return {
            "success": True,
            "message": "UI通知已处理"
        }

    except Exception as e:
        logger.error(f"处理UI通知失败: {e}")
        raise HTTPException(500, f"处理失败: {str(e)}")
'''
与/tool_result的区别:
  /tool_result:特定用于工具结果
  /ui_notification:通用的UI控制,可以做更多事情
'''
# ========工具调用流程总结:=========
#这里有两个不同的工具调用处理流程:
'''
流程一:完整回调流程(推荐)
1. 用户提问
2. AI决定调用工具
3. 调用工具 → 工具执行
4. 工具完成 → 调用/tool_result_callback
5. AI重新生成包含结果的回复
6. 调用/_notify_ui_refresh(或UI通知)显示回复

流程二:简单结果流程
1. 用户提问
2. AI决定调用工具
3. 调用工具 → 工具执行
4. 工具完成 → 调用/tool_result
5. 直接显示结果(不经过AI重新组织)
'''
# =======关键函数和方法:=======
'''
日志相关:
  message_manager.get_context_statistics() - 获取统计
  message_manager.load_recent_context() - 加载历史
工具调用相关:
  message_manager.get_messages() - 获取会话消息
  message_manager.add_message() - 添加消息到历史
  message_manager.save_conversation_log() - 保存日志
AI相关:
  get_llm_service() - 获取AI服务
  llm_service.chat_with_context() - AI聊天
UI相关:
  chat.tool_ai_response_received.emit() - Qt信号发送
'''
# ==========错误处理模式:==========
#注意这些接口都使用了相同的错误处理模式:
'''
try:
    # 主要逻辑
    return {"success": True, ...}
except Exception as e:
    logger.error(f"错误信息: {e}")
    raise HTTPException(status_code=500, detail=str(e))
'''
#整体架构理解:
#这部分的代码展示了AI系统的模块化设计:
'''
用户界面 (UI)
    ↓
API接口层 (这些接口)
    ↓
业务逻辑层 (message_manager, llm_service)
    ↓
工具层 (各种工具:天气、搜索等)
    ↓
数据层 (日志、历史记录)
'''

#这段代码定义了三个异步函数,它们都是内部使用的工具函数(以_开头表示私有)。这些函数的作用是把AI的回复发送给前端的用户界面(UI)。

#内部辅助函数(与UI通信)
async def _trigger_chat_stream_no_intent(session_id: str, response_text: str):
    """触发聊天流式响应但不触发意图分析 - 发送纯粹的AI回复到UI"""
    try:
        logger.info(f"[UI发送] 开始发送AI回复到UI,会话: {session_id}")
        logger.info(f"[UI发送] 发送内容: {response_text[:200]}...")

        # 直接调用现有的流式对话接口,但跳过意图分析
        import httpx

        # 构建请求数据 - 使用纯粹的AI回复内容,并跳过意图分析
        chat_request = {
            "message": response_text,  # 直接使用AI回复内容,不加标记
            "stream": True,
            "session_id": session_id,
            "use_self_game": False,
            "disable_tts": False,
            "return_audio": False,
            "skip_intent_analysis": True  # 关键:跳过意图分析
        }

        # 调用现有的流式对话接口
        from system.config import get_server_port
        api_url = f"http://localhost:{get_server_port('api_server')}/chat/stream"

        async with httpx.AsyncClient() as client:
            async with client.stream("POST", api_url, json=chat_request) as response:   #json=chat_request:把请求数据转为JSON格式
                if response.status_code == 200:
                    # 处理流式响应,包括TTS切割
                    async for chunk in response.aiter_text():
                        if chunk.strip():
                            # 这里可以进一步处理流式响应
                            # 或者直接让UI处理流式响应
                            pass

                    logger.info(f"[UI发送] AI回复已成功发送到UI: {session_id}")
                    logger.info(f"[UI发送] 成功显示到UI")
                else:
                    logger.error(f"[UI发送] 调用流式对话接口失败: {response.status_code}")

    except Exception as e:
        logger.error(f"[UI发送] 触发聊天流式响应失败: {e}")

#这个函数比第一个更直接,它直接调用UI通知接口。
async def _notify_ui_refresh(session_id: str, response_text: str):
    """通知UI刷新会话历史"""
    try:
        import httpx

        # 通过UI通知接口直接显示AI回复
        ui_notification_payload = {
            "session_id": session_id,
            "action": "show_tool_ai_response",
            "ai_response": response_text
        }

        from system.config import get_server_port
        api_url = f"http://localhost:{get_server_port('api_server')}/ui_notification"

        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.post(api_url, json=ui_notification_payload)
            if response.status_code == 200:
                logger.info(f"[UI通知] AI回复显示通知发送成功: {session_id}")
            else:
                logger.error(f"[UI通知] AI回复显示通知失败: {response.status_code}")

    except Exception as e:
        logger.error(f"[UI通知] 通知UI刷新失败: {e}")



#这是第三种方法,通过非流式聊天接口发送。
async def _send_ai_response_directly(session_id: str, response_text: str):
    """直接发送AI回复到UI"""
    try:
        import httpx

        # 使用非流式接口发送AI回复
        chat_request = {
            "message": f"[工具结果] {response_text}",  # 添加标记让UI知道这是工具结果
            "stream": False,
            "session_id": session_id,
            "use_self_game": False,
            "disable_tts": False,
            "return_audio": False,
            "skip_intent_analysis": True
        }

        from system.config import get_server_port
        api_url = f"http://localhost:{get_server_port('api_server')}/chat"

        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.post(api_url, json=chat_request)
            if response.status_code == 200:
                logger.info(f"[直接发送] AI回复已通过非流式接口发送到UI: {session_id}")
            else:
                logger.error(f"[直接发送] 非流式接口发送失败: {response.status_code}")

    except Exception as e:
        logger.error(f"[直接发送] 直接发送AI回复失败: {e}")


# 工具执行结果已通过LLM总结并保存到对话历史中
# UI可以通过查询历史获取工具执行结果

# =======@符号饰器(Decorator)==========
#什么是装饰器?
#1. 最简单理解,装饰器就像一个"包装纸",它可以把一个函数"包装"起来,给这个函数增加一些额外的功能,但不改变函数本身的核心逻辑。
#生活比喻:假设你有一个普通的礼物(函数),用漂亮的包装纸包起来(装饰器),礼物本身没变,但现在更好看了(功能更强了)

#2. 代码示例理解
'''
没有装饰器的普通函数:
def say_hello():
    print("你好!")

say_hello()  # 输出:你好!

使用装饰器的函数:
def add_logging(func):  # 装饰器函数
    def wrapper():
        print("开始记录日志...")
        func()
        print("日志记录完成。")
    return wrapper

@add_logging  # 使用装饰器
def say_hello():
    print("你好!")

say_hello()
# 输出:
# 开始记录日志...
# 你好!
# 日志记录完成。

say_hello函数的核心功能(打印"你好!")没变,但通过@add_logging装饰器,给它增加了记录日志的功能。
'''

# =========FastAPI中的装饰器============
#在FastAPI框架中,装饰器有特定的用途:定义API路由。
#1. 基础用法
'''
@app.get("/")  # 装饰器:把下面的函数变成一个处理GET请求的API
async def root():
    return {"message": "Hello"}
    
理解:
@app.get("/"):这是一个装饰器
它告诉FastAPI:"当有人访问网站的根路径(/)时,就执行下面的root函数"
root函数返回的数据会自动转换成HTTP响应
'''
#2. 各种HTTP方法的装饰器
'''
@app.get("/users")        # 获取数据
@app.post("/users")       # 创建数据
@app.put("/users/{id}")   # 更新数据
@app.delete("/users/{id}") # 删除数据

记忆方法:就像数据库的CRUD操作:
GET:读取(R)

POST:创建(C)

PUT:更新(U)

DELETE:删除(D)
'''
#这个代码中出现的所有装饰器类型
#让我们回顾一下代码中出现的各种装饰器:
#1. 路由装饰器(最多)
'''
@app.get("/")                     # 根路径
@app.get("/health")               # 健康检查
@app.get("/system/info")          # 系统信息
@app.post("/chat")                # 普通聊天
@app.post("/chat/stream")         # 流式聊天
@app.get("/memory/stats")         # 记忆统计
@app.get("/sessions")             # 获取所有会话
@app.get("/sessions/{session_id}") # 获取单个会话
@app.delete("/sessions/{session_id}") # 删除会话
@app.delete("/sessions")          # 清空所有会话
@app.get("/logs/context/statistics") # 日志统计
@app.get("/logs/context/load")    # 加载日志
@app.post("/tool_notification")   # 工具通知
@app.post("/tool_result_callback") # 工具结果回调
@app.post("/tool_result")         # 工具结果
@app.post("/save_tool_conversation") # 保存工具对话
@app.post("/ui_notification")     # UI通知
'''
#2. 特殊的生命周期装饰器
'''
@asynccontextmanager  # 这是一个特殊的装饰器
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    
作用:这个装饰器把这个函数变成一个异步上下文管理器,用于管理应用的启动和关闭。    
'''

#装饰器的工作原理(更深入一点)
#1. 装饰器实际上在做什么?
'''
当你写:
@app.get("/hello")
async def hello():
    return "World"
    
实际上相当于:
def hello():
    return "World"

hello = app.get("/hello")(hello)  # 装饰器的底层实现
'''

# ======2. FastAPI装饰器的魔法 ===========
'''
FastAPI的装饰器做了很多事:
 注册路由:把这个函数和URL路径关联起来
 解析参数:自动解析请求参数、查询参数、路径参数
 验证数据:验证请求数据的格式
 生成文档:自动为这个接口生成API文档
 处理响应:把返回值转换成HTTP响应
 
对比(如果没有装饰器):
# 没有装饰器的写法(伪代码)
def hello():
    return "World"

# 需要手动注册路由
app.add_route(method="GET", path="/hello", handler=hello)
# 需要手动配置参数解析
# 需要手动配置响应处理
# ...很多其他代码
'''
# ========装饰器参数的解析========
'''
1. 路径参数
@app.get("/users/{user_id}")  # {user_id} 是路径参数
async def get_user(user_id: str):
    return {"user_id": user_id}
    
2. 查询参数
@app.get("/items")
async def read_items(skip: int = 0, limit: int = 10):
    return {"skip": skip, "limit": limit}
访问:/items?skip=0&limit=10
'''
# 实际案例:分析代码中的一个装饰器
'''
让我们详细分析代码中的这个例子:
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    # ...函数体...
    
这个装饰器做了以下事情:
  @app.post:这是一个POST请求装饰器
  "/chat":路径是/chat
  response_model=ChatResponse:
  指定响应数据的格式
  自动验证返回的数据是否符合ChatResponse的格式
  自动生成API文档中的响应示例
'''
#装饰器的执行顺序
'''
当多个装饰器装饰一个函数时,它们从下往上执行:
@app.middleware("http")  # 第二个执行
@app.get("/")           # 第一个执行
async def root():
    return {"message": "Hello"}
但实际上,在FastAPI中,你很少会看到多个装饰器叠加使用。
'''

# 总结装饰器在这个代码中的作用:
'''
装饰器	             |   作用	         | 类比
@app.get("/xxx")	 | 创建一个GET接口	 | 开一个"只读"窗口
@app.delete("/xxx")	 | 创建一个DELETE接口	 | 开一个"删除"窗口
@asynccontextmanager | 创建异步上下文管理器	 | 设置"开门关门"流程
'''
#整体比喻:
'''
这个程序就像一个政府办事大厅
每个@app.get/@app.post装饰器就是开一个办事窗口
窗口上贴着标签(路径):/chat、/health、/sessions等
窗口后面坐着的办事员就是被装饰的函数
群众(客户端)来到对应窗口,办事员就处理他们的需求
'''
#常见问题
'''
Q:为什么有些装饰器有括号,有些没有?
A:@app.get这种有括号,是因为它本身是一个函数,需要传递参数(如路径)。@asynccontextmanager没有括号是因为它是一个装饰器工厂。

Q:装饰器一定要写在函数上面吗?
A:是的,装饰器必须紧挨着函数定义,写在函数定义的前一行。

Q:一个函数可以有多个装饰器吗?
A:可以,但不常见。如果有,执行顺序是从下往上。
'''

__init__.py

"""
NagaAgent API服务器模块
"""

from .api_server import app

__all__ = ['app'] 

'''
__init__.py 文件虽然代码短,但作用特别关键,是 Python 包(package)的核心标识和导出控制文

核心作用拆解
1.标记包身份
只要目录里有 __init__.py,Python 就会把这个目录识别成一个可导入的包,而不是普通文件夹。
没有它的话,你没法用 import apiserver 这种方式导入这个目录下的代码。
Python 3.3+ 之前,目录必须有 __init__.py 才被认为是包。现在虽然可以是隐式命名空间包,但显式创建仍然是最佳实践。

2.简化导入逻辑
代码里 from .api_server import app 是把 api_server.py 里的 app 对象导入到包的“根层级”。
原本你要写 from apiserver.api_server import app,现在可以直接写 from apiserver import app,更简洁。
没有 __init__.py 时:
# 用户需要知道内部结构
from mypackage.module1 import ClassA
from mypackage.module2 import function1
from mypackage.subpackage.module3 import helper
有 __init__.py 时:
# mypackage/__init__.py
from .module1 import ClassA
from .module2 import function1
from .subpackage.module3 import helper
_init_.py:# 用户使用更简洁
from mypackage import ClassA, function1, helper

3.控制导出范围(all)
__all__ = ['app'] 是定义这个包被from apiserver import * 导入时,对外暴露的内容只有 app。
避免把模块里的其他无关变量/函数也被批量导入,规范包的对外接口
当用户使用 from apiserver import * 时,只会导入 __all__ 中指定的内容

4. 暴露核心功能
实际案例演示:
项目结构:
mypackage/
    ├── __init__.py          # 包的主入口
    ├── utils.py            # 内部工具函数
    ├── core.py             # 核心类
    └── submodule/
        ├── __init__.py     # 子包的入口
        └── helper.py
 内容:  
 # mypackage/utils.py
def internal_helper():
    return "内部使用的工具函数"

def user_helper():
    return "提供给用户的工具函数"
 =================== 
# mypackage/core.py
class DataProcessor:
    def process(self):
        return "处理数据"
    
class Analyzer:
    def analyze(self):
        return "分析数据"      
 ================       
# mypackage/submodule/helper.py
class SpecialHelper:
    def help(self):
        return "特殊帮助"
        
现在配置 __init__.py: 
# mypackage/__init__.py
from .core import DataProcessor, Analyzer
from .utils import user_helper

# 版本信息
__version__ = "1.0.0"
__author__ = "Your Name"

# 控制 from mypackage import * 的行为
__all__ = ['DataProcessor', 'Analyzer', 'user_helper']

# 可选:初始化代码
print(f"Initializing {__name__} version {__version__}")
===================
# mypackage/submodule/__init__.py
from .helper import SpecialHelper

用户使用体验:
# 用户代码 - 极其简洁
from mypackage import DataProcessor, user_helper
from mypackage.submodule import SpecialHelper

# 或者全部从主包导入
import mypackage

dp = mypackage.DataProcessor()
result = mypackage.user_helper()

5. 其他重要功能
版本管理
# __init__.py
__version__ = "2.1.0"
__version_info__ = (2, 1, 0)

包级别的配置  
# __init__.py
import logging

# 设置包级别的日志
logger = logging.getLogger(__name__)

# 包配置
DEFAULT_CONFIG = {
    'timeout': 30,
    'retry': 3
}   

延迟导入(Lazy Import)  
# __init__.py
def __getattr__(name):
    """按需导入,减少启动时间"""
    if name == "HeavyClass":
        from .heavy_module import HeavyClass
        return HeavyClass
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}") 
    
6. 实际设计模式
模式一:API门面(Facade)
# __init__.py
# 将所有重要的类、函数导出到包级别
from .module1 import APIClient, Connection
from .module2 import process_data, validate_input
from .module3 import Result, Error

__all__ = [
    'APIClient', 'Connection',
    'process_data', 'validate_input',
    'Result', 'Error'
]
模式二:子包重导出
# __init__.py
# 将子包的内容提升到主包级别
from . import subpackage1
from . import subpackage2

# 可以直接导入子包中的特定内容
from .subpackage1.core import CoreClass
from .subpackage2.helpers import helper_func    
'''
#简单说,它就像这个 apiserver 包的“门面”:告诉 Python 这是个合法包,同时把核心的 app 暴露出来,让外部用起来更方便

文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇