NagaAgent/agentserver
本文最后更新于63 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com

1

agent_server.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
第一行:告诉电脑“请用python3来运行这个文件”
第二行:告诉电脑“这个文件用的是UTF-8编码”(这样就能显示中文了)
NagaAgent独立服务 - 基于博弈论的电脑控制智能体
提供意图识别和电脑控制任务执行功能
"""

import asyncio   # 处理“异步”任务(可以同时做多件事)
import uuid      # 生成唯一的ID号码(像身份证号一样)
import logging    # 记录程序运行日志(就像写日记)
from typing import Dict, Any, Optional, List   # 定义数据类型
from datetime import datetime

from fastapi import FastAPI, HTTPException  # 创建Web服务的工具
from fastapi.responses import JSONResponse  # 返回JSON格式数据
from contextlib import asynccontextmanager  # 管理程序启动和关闭

from system.config import config            # 配置文件
from system.background_analyzer import get_background_analyzer   # 意图分析器
from agentserver.agent_computer_control import ComputerControlAgent   # 电脑控制核心
from agentserver.task_scheduler import get_task_scheduler, TaskStep   # 任务安排器
from agentserver.toolkit_manager import toolkit_manager         # 工具管理器

# 配置日志
#格式解释:时间 - 名字 - 等级 - 消息,2024-01-01 10:00:00 - __main__ - INFO - 服务启动成功
logger = logging.getLogger(__name__)     # 创建一个“日记本”
logger.setLevel(logging.INFO)            # 设置记录级别(INFO级)
if not logger.handlers:                  # 如果还没有写日记的工具,就创建一个
    handler = logging.StreamHandler()    # 创建一个往控制台输出的工具
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')   # 日记格式
    handler.setFormatter(formatter)      # 应用格式
    logger.addHandler(handler)           # 把工具给日记本

@asynccontextmanager
async def lifespan(app: FastAPI):
    """FastAPI应用生命周期"""
    # startup
    try:
        # 初始化意图分析器(理解你想做什么)
        Modules.analyzer = get_background_analyzer()
        # 初始化电脑控制智能体 (实际控制电脑)
        Modules.computer_control = ComputerControlAgent()
        # 初始化任务调度器 (安排任务顺序)
        Modules.task_scheduler = get_task_scheduler()
        
        # 设置LLM配置用于智能压缩,设置AI大脑配置
        if hasattr(config, 'api') and config.api:
            llm_config = {
                "model": config.api.model,       # AI模型名字
                "api_key": config.api.api_key,   # 钥匙
                "api_base": config.api.base_url  # 服务器地址
            }
            Modules.task_scheduler.set_llm_config(llm_config)
        
        logger.info("NagaAgent电脑控制服务初始化完成")
    except Exception as e:
        logger.error(f"服务初始化失败: {e}")
        raise

    # 运行期
    yield   # 程序正常运行中

    # shutdown
    try:
        logger.info("NagaAgent电脑控制服务已关闭")
    except Exception as e:
        logger.error(f"服务关闭失败: {e}")

#创建Web服务,创建一个叫app的Web服务器,名字:NagaAgent Computer Control Server,版本:1.0.0,使用刚才定义的lifespan管理开关机
app = FastAPI(title="NagaAgent Computer Control Server", version="1.0.0", lifespan=lifespan)

class Modules:
    """全局模块管理器"""
    analyzer = None             # 意图分析器(理解你要什么)
    computer_control = None     # 电脑控制器(实际干活)
    task_scheduler = None       # 任务安排器(安排顺序)

#作用:获取现在的时间,格式化成标准字符串,例子:"2024-01-01T10:00:00.123456"
def _now_iso() -> str:
    """获取当前时间ISO格式"""
    return datetime.now().isoformat()

#处理电脑控制任务(核心功能),这个函数是核心,它:
#1.接收一个指令(比如“打开记事本”)2.调用电脑控制智能体执行3.返回执行结果
async def _process_computer_control_task(instruction: str, session_id: Optional[str] = None) -> Dict[str, Any]:
    """处理电脑控制任务"""
    try:    # 1. 记录开始执行
        logger.info(f"开始处理电脑控制任务: {instruction}")
        
        #  2. 直接调用电脑控制智能体
        result = await Modules.computer_control.handle_handoff({
            "action": "automate_task",   # 动作:自动执行任务
            "target": instruction,       # 目标:用户给的指令
            "parameters": {}             # 额外参数(这里为空)
        })

        # 3. 记录完成,返回成功结果
        logger.info(f"电脑控制任务完成: {instruction}")
        return {
            "success": True,            # 成功了吗?True=成功
            "result": result,           # 具体结果
            "task_type": "computer_control",     # 任务类型
            "instruction": instruction   # 原指令
        }
        
    except Exception as e:       # 如果出错了
        # 4. 记录错误,返回失败结果
        logger.error(f"电脑控制任务失败: {e}")
        return {
            "success": False,        # 成功了吗?False=失败
            "error": str(e),         # 错误信息
            "task_type": "computer_control",
            "instruction": instruction
        }

# 异步执行多个任务,这个函数处理多个任务:
#agent_calls: 任务列表(多个要执行的任务),session_id: 会话ID(区分不同用户)
#analysis_session_id: 分析会话ID,request_id: 请求ID(每个请求唯一编号),callback_url: 回调地址(完成后通知谁)
async def _execute_agent_tasks_async(agent_calls: List[Dict[str, Any]], session_id: str, 
                                   analysis_session_id: str, request_id: str, callback_url: Optional[str] = None):
    """异步执行Agent任务 - 应用与MCP服务器相同的会话管理逻辑"""
    try:
        logger.info(f"[异步执行] 开始执行 {len(agent_calls)} 个Agent任务")
        
        # 处理每个Agent任务
        results = []
        # 1. 循环处理每个任务
        for i, agent_call in enumerate(agent_calls):  # 取出任务信息
            try:
                instruction = agent_call.get("instruction", "")      # 指令
                tool_name = agent_call.get("tool_name", "未知工具")   # 工具名
                service_name = agent_call.get("service_name", "未知服务")   # 服务名
                
                logger.info(f"[异步执行] 执行任务 {i+1}/{len(agent_calls)}: {tool_name} - {instruction}")
                
                # 添加任务步骤到调度器
                # 2. 记录步骤到任务调度器(就像写待办事项)
                await Modules.task_scheduler.add_task_step(request_id, TaskStep(
                    step_id=f"step_{i+1}",
                    task_id=request_id,
                    purpose=f"执行Agent任务: {tool_name}",
                    content=instruction,
                    output="",
                    analysis=None,
                    success=True
                ))
                
                #3.执行电脑控制任务
                result = await _process_computer_control_task(instruction, session_id)
                results.append({
                    "agent_call": agent_call,
                    "result": result,
                    "step_index": i
                })
                
                # 4.更新任务步骤结果
                await Modules.task_scheduler.add_task_step(request_id, TaskStep(
                    step_id=f"step_{i+1}_result",
                    task_id=request_id,
                    purpose=f"任务结果: {tool_name}",
                    content=f"执行结果: {result.get('success', False)}",
                    output=str(result.get('result', '')),
                    analysis={"analysis": f"任务类型: {result.get('task_type', 'unknown')}, 工具: {tool_name}, 服务: {service_name}"},
                    success=result.get('success', False),
                    error=result.get('error')
                ))

                
                logger.info(f"[异步执行] 任务 {i+1} 完成: {result.get('success', False)}")
                
            except Exception as e:
                logger.error(f"[异步执行] 任务 {i+1} 执行失败: {e}")
                results.append({
                    "agent_call": agent_call,
                    "result": {"success": False, "error": str(e)},
                    "step_index": i
                })
                '''循环过程:
                    任务1: "打开浏览器" → 执行 → 记录结果
                    任务2: "搜索Python教程" → 执行 → 记录结果
                    任务3: "保存网页" → 执行 → 记录结果
                '''
        
        # 发送回调通知(如果提供了回调URL)
        if callback_url:
            await _send_callback_notification(callback_url, request_id, session_id, analysis_session_id, results)
        
        logger.info(f"[异步执行] 所有Agent任务执行完成: {len(results)} 个任务")
        
    except Exception as e:
        logger.error(f"[异步执行] Agent任务执行失败: {e}")
        # 发送错误回调
        if callback_url:
            await _send_callback_notification(callback_url, request_id, session_id, analysis_session_id, [], str(e))

# 发送回调通知,这个函数在任务完成后通知别人:
'''
发送的内容:
{
  "request_id": "123",          // 请求ID
  "session_id": "abc",          // 会话ID
  "analysis_session_id": "xyz", // 分析会话ID
  "success": true,             // 是否成功
  "error": null,               // 错误信息(如果没有就是null)
  "results": [...],            // 所有任务结果
  "completed_at": "2024-01-01T10:00:00"  // 完成时间
}
'''
async def _send_callback_notification(callback_url: str, request_id: str, session_id: str, 
                                    analysis_session_id: str, results: List[Dict[str, Any]], error: Optional[str] = None):
    """发送回调通知 - 应用与MCP服务器相同的回调机制"""
    try:
        import httpx
        
        callback_payload = {
            "request_id": request_id,
            "session_id": session_id,
            "analysis_session_id": analysis_session_id,
            "success": error is None,
            "error": error,
            "results": results,
            "completed_at": _now_iso()
        }
        
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.post(callback_url, json=callback_payload)
            if response.status_code == 200:
                logger.info(f"[回调通知] Agent任务结果回调成功: {request_id}")
            else:
                logger.error(f"[回调通知] Agent任务结果回调失败: {response.status_code}")
                
    except Exception as e:
        logger.error(f"[回调通知] 发送Agent任务回调失败: {e}")

# ============ API端点 ============

@app.get("/health")
async def health_check():
    """健康检查"""
    return {
        "status": "healthy",    # 状态:健康
        "timestamp": _now_iso(),   # 当前时间
        "modules": {          # 各个工具是否准备好
            "analyzer": Modules.analyzer is not None,   # 分析器准备好了吗?
            "computer_control": Modules.computer_control is not None   # 电脑控制器准备好了吗?
        }
    }
'''
访问方式:
网址:http://服务器地址/health
方法:GET(查看)
返回示例:
{
  "status": "healthy",
  "timestamp": "2024-01-01T10:00:00",
  "modules": {
    "analyzer": true,      // true表示准备好了
    "computer_control": true
  }
}
'''

@app.post("/schedule")
async def schedule_agent_tasks(payload: Dict[str, Any]):
    """统一的任务调度端点(主要入口) - 应用与MCP服务器相同的会话管理逻辑"""
    # 1. 先检查工具是否准备好
    if not Modules.computer_control or not Modules.task_scheduler:
        raise HTTPException(503, "电脑控制智能体或任务调度器未就绪")
    
    # 提取新的请求格式参数
    query = payload.get("query", "")   # 查询内容
    agent_calls = payload.get("agent_calls", [])   # 要执行的任务列表
    session_id = payload.get("session_id")         # 会话ID(哪个顾客)
    analysis_session_id = payload.get("analysis_session_id")   # 分析会话ID
    request_id = payload.get("request_id", str(uuid.uuid4()))  # 请求ID,如果没有就生成一个
    callback_url = payload.get("callback_url")     # 完成后通知的地址
    
    try:
        logger.info(f"[统一调度] 接收Agent任务调度请求: {query}")
        logger.info(f"[统一调度] 会话ID: {session_id}, 分析会话ID: {analysis_session_id}, 请求ID: {request_id}")
        
        if not agent_calls:   # 如果任务列表是空的
            return {
                "success": True,
                "status": "no_tasks",    # 状态:没有任务
                "message": "未发现可执行的Agent任务",
                "task_id": request_id,
                "accepted_at": _now_iso(),
                "session_id": session_id,
                "analysis_session_id": analysis_session_id
            }

        logger.info(f"[统一调度] 会话 {session_id} 发现 {len(agent_calls)} 个Agent任务")

        # 1.有任务,创建任务调度器任务
        task_id = await Modules.task_scheduler.create_task(
            task_id=request_id,      # 任务ID
            purpose=f"执行Agent任务: {query}",   # 任务目的
            session_id=session_id,   # 会话ID
            analysis_session_id=analysis_session_id    # 分析会话ID
        )

        # 2.异步执行任务(不阻塞响应)
        asyncio.create_task(_execute_agent_tasks_async(   #asyncio.create_task():创建后台任务,不阻塞当前请求
            agent_calls, session_id, analysis_session_id, request_id, callback_url
        ))

        #3.立即返回结果
        return {
            "success": True,
            "status": "scheduled",   # 状态:已安排
            "task_id": request_id,
            "message": f"已调度 {len(agent_calls)} 个Agent任务",
            "accepted_at": _now_iso(),
            "session_id": session_id,
            "analysis_session_id": analysis_session_id
        }
        
    except Exception as e:
        logger.error(f"[统一调度] Agent任务调度失败: {e}")
        raise HTTPException(500, f"调度失败: {e}")

@app.post("/analyze_and_execute")
async def analyze_and_execute(payload: Dict[str, Any]):
    """意图分析和电脑控制任务执行 - 保持向后兼容
    为什么需要这个?
    为了兼容旧版本的程序,新版本用/schedule,旧版本还用这个
    """
    if not Modules.analyzer or not Modules.computer_control:
        raise HTTPException(503, "分析器或电脑控制智能体未就绪")
    
    messages = (payload or {}).get("messages", [])
    if not isinstance(messages, list):
        raise HTTPException(400, "messages必须是{role, content}格式的列表")
    
    session_id = (payload or {}).get("session_id")
    
    try:
        # 直接执行电脑控制任务,不进行意图分析
        # 意图分析已在API服务器中完成,这里只负责执行具体的Agent任务

        # 从消息中提取任务指令
        tasks = []
        for msg in messages:
            if msg.get("role") == "user":   # 如果是用户消息
                content = msg.get("content", "")
                if "执行Agent任务:" in content:   # 如果包含特定关键词
                    # 提取任务指令
                    instruction = content.replace("执行Agent任务:", "").strip()   # 提取指令
                    tasks.append({
                        "instruction": instruction
                    })
                    '''
                    示例消息格式:
                    {
                      "messages": [
                          {"role": "user", "content": "执行Agent任务: 打开记事本"},
                          {"role": "user", "content": "执行Agent任务: 输入Hello World"}
                      ]
                    }
                    '''
'''与/schedule的区别:
/schedule:异步执行,立即返回
/analyze_and_execute:同步执行,等所有任务完成才返回
就像快餐和堂食的区别'''

        if not tasks:
            return {
                "success": True,
                "status": "no_tasks",
                "message": "未发现可执行的电脑控制任务",
                "accepted_at": _now_iso(),
                "session_id": session_id
            }

        logger.info(f"会话 {session_id} 发现 {len(tasks)} 个电脑控制任务")

        # 处理每个任务
        results = []
        for task_instruction in tasks:
            result = await _process_computer_control_task(task_instruction, session_id)
            results.append(result)

        return {
            "success": True,
            "status": "completed",
            "tasks_processed": len(tasks),
            "results": results,
            "accepted_at": _now_iso(),
            "session_id": session_id
        }
        
    except Exception as e:
        logger.error(f"意图分析和任务执行失败: {e}")
        raise HTTPException(500, f"处理失败: {e}")

@app.get("/computer_control/availability")
async def get_computer_control_availability():
    """获取电脑控制可用性(检查工具)"""
    try:
        if not Modules.computer_control:    # 如果电脑控制器不存在
            return {"ready": False, "reasons": ["电脑控制智能体未初始化"]}
        
        # 检查电脑控制能力
        capabilities = Modules.computer_control.get_capabilities()
        return {
            "ready": capabilities.get("enabled", False),   # 是否可用
            "capabilities": capabilities,                  # 具体能力
            "timestamp": _now_iso()
        }
'''
{
  "ready": true,
  "capabilities": {
    "enabled": true,
    "tools": ["鼠标控制", "键盘输入", "截图"],
    "platform": "Windows"
  },
  "timestamp": "2024-01-01T10:00:00"
}
'''
    except Exception as e:
        logger.error(f"检查电脑控制可用性失败: {e}")
        return {"ready": False, "reasons": [f"检查失败: {e}"]}

@app.post("/computer_control/execute")
async def execute_computer_control_task(payload: Dict[str, Any]):
    """直接执行电脑控制任务(单点执行)"""
    if not Modules.computer_control:
        raise HTTPException(503, "电脑控制智能体未就绪")
    
    instruction = payload.get("instruction", "")
    if not instruction:    # 如果指令为空
        raise HTTPException(400, "instruction不能为空")
    
    try:
        result = await _process_computer_control_task(instruction)
        return {
            "success": result.get("success", False),
            "result": result.get("result"),
            "error": result.get("error"),
            "instruction": instruction
        }
    except Exception as e:
        logger.error(f"执行电脑控制任务失败: {e}")
        raise HTTPException(500, f"执行失败: {e}")
'''
使用场景:
  只执行一个简单任务
  需要立即知道结果
  不需要复杂调度
示例请求:  
{
  "instruction": "打开浏览器"
}
示例响应:
{
  "success": true,
  "result": "已打开Chrome浏览器",
  "error": null,
  "instruction": "打开浏览器"
}
总结所有API端点 
端点                              | 方法   |   用途           |  特点
/health                          | GET    |  健康检查        |  快速检查服务状态
/schedule                        | POST   |  统一任务调度     |  主要入口,异步执行,支持多个任务
/analyze_and_execute             | POST   |  意图分析和执行   |   兼容旧版本,同步执行
/computer_control/availability   | GET    |  获取可用性      |   检查电脑控制能力
/computer_control/execute        | POST	  |  直接执行任务     |  单任务,同步执行
'''
# ============ 任务记忆管理API ============

@app.get("/tasks")
async def get_tasks(session_id: Optional[str] = None):
    """获取任务列表(看看现在在忙什么)"""
    # 1. 检查工具是否准备好
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:
        # 2. 从调度器获取正在运行的任务
        running_tasks = await Modules.task_scheduler.get_running_tasks()
        return {
            "success": True,
            "running_tasks": running_tasks, # 正在运行的任务
            "session_id": session_id
        }
    except Exception as e:
        logger.error(f"获取任务列表失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")
'''使用示例:
GET http://localhost:8000/tasks
可能返回:
{
  "success": true,
  "running_tasks": [
    {
      "task_id": "task001",
      "purpose": "打开记事本并记录",
      "status": "running",
      "started_at": "2024-01-01T10:00:00"
    },
    {
      "task_id": "task002", 
      "purpose": "保存文件",
      "status": "completed",
      "started_at": "2024-01-01T09:30:00"
    }
  ]
}
'''

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    """获取指定任务状态(查看具体任务)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:  # 获取任务状态
        task_status = await Modules.task_scheduler.get_task_status(task_id)
        # 如果没找到,返回404错误
        if not task_status:
            raise HTTPException(404, f"任务 {task_id} 不存在")
        
        return {
            "success": True,
            "task": task_status
        }
    except HTTPException:    # 如果是我们主动抛出的错误,直接传递
        raise
    except Exception as e:   # 其他未知错误
        logger.error(f"获取任务状态失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")
'''
路径参数:{task_id}
从URL中提取任务ID
例子:/tasks/abc123 → task_id = "abc123"
错误处理两种方式:
1.HTTPException:主动抛出的已知错误(如404)
2.Exception:未知的意外错误
'''

@app.get("/tasks/{task_id}/memory")
async def get_task_memory(task_id: str, include_key_facts: bool = True):
    """获取任务记忆摘要(看看任务经历了什么)"""
    # 查询参数:include_key_facts,默认为true
    # 例子:/tasks/abc123/memory?include_key_facts=false
 '''
 查询参数 vs 路径参数:
# 路径参数(必须的)
@app.get("/tasks/{task_id}/memory")
# 查询参数(可选的)
async def get_task_memory(task_id: str, include_key_facts: bool = True):
使用示例:GET http://localhost:8000/tasks/abc123/memory?include_key_facts=true
 '''
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:
        memory_summary = await Modules.task_scheduler.get_task_memory_summary(task_id, include_key_facts)
        return {
            "success": True,
            "task_id": task_id,
            "memory_summary": memory_summary
        }
    except Exception as e:
        logger.error(f"获取任务记忆失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")

@app.get("/memory/global")
async def get_global_memory():
    """获取全局记忆摘要(看看整个系统的记忆)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:
        # 获取全局摘要
        global_summary = await Modules.task_scheduler.get_global_memory_summary()
        # 获取失败尝试的摘要
        failed_attempts = await Modules.task_scheduler.get_failed_attempts_summary()
'''为什么需要全局记忆?
看看整个系统执行了多少任务
哪些任务经常失败
总结经验,改进未来执行
'''

        return {
            "success": True,
            "global_summary": global_summary,
            "failed_attempts": failed_attempts
        }
    except Exception as e:
        logger.error(f"获取全局记忆失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")

@app.post("/tasks/{task_id}/steps")
async def add_task_step(task_id: str, payload: Dict[str, Any]):
    """添加任务步骤(手动添加步骤记录)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:  # 创建一个TaskStep对象
        step = TaskStep(
            step_id=payload.get("step_id", str(uuid.uuid4())),   # 步骤ID,没有就生成
            task_id=task_id,                                     # 属于哪个任务
            purpose=payload.get("purpose", "执行步骤"),            # 步骤目的
            content=payload.get("content", ""),                  # 步骤内容
            output=payload.get("output", ""),                    # 步骤输出
            analysis=payload.get("analysis"),                    # 分析结果
            success=payload.get("success", True),                # 是否成功
            error=payload.get("error")                           # 错误信息
        )

        # 添加到调度器
        await Modules.task_scheduler.add_task_step(task_id, step)
        
        return {
            "success": True,
            "message": "步骤添加成功",
            "step_id": step.step_id
        }
    except Exception as e:
        logger.error(f"添加任务步骤失败: {e}")
        raise HTTPException(500, f"添加失败: {e}")

#DELETE方法:用于删除资源,这里是删除任务的记忆
@app.delete("/tasks/{task_id}/memory")
async def clear_task_memory(task_id: str):
    """清除任务记忆(删除单个任务的记忆)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:# 调用调度器清除记忆
        success = await Modules.task_scheduler.clear_task_memory(task_id)
        if not success:  # 调用调度器清除记忆
            raise HTTPException(404, f"任务 {task_id} 不存在")
        
        return {
            "success": True,
            "message": f"任务 {task_id} 的记忆已清除"
        }
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"清除任务记忆失败: {e}")
        raise HTTPException(500, f"清除失败: {e}")

@app.delete("/memory/global")
async def clear_global_memory():
    """清除全局记忆(清空所有记忆)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:
        await Modules.task_scheduler.clear_all_memory()
        return {
            "success": True,
            "message": "全局记忆已清除"
        }
    except Exception as e:
        logger.error(f"清除全局记忆失败: {e}")
        raise HTTPException(500, f"清除失败: {e}")
#危险操作:会删除所有任务的记忆,就像格式化硬盘一样。
# ============ 会话级别的记忆管理API ============

@app.get("/sessions")
async def get_all_sessions():
    """获取所有会话的摘要信息(看看有哪些对话)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try: # 获取所有会话
        sessions = await Modules.task_scheduler.get_all_sessions()
        return {
            "success": True,
            "sessions": sessions,            # 会话列表
            "total_sessions": len(sessions)  # 会话总数
        }
    except Exception as e:
        logger.error(f"获取会话列表失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")

@app.get("/sessions/{session_id}/memory")
async def get_session_memory_summary(session_id: str):
    """获取会话记忆摘要(查看一次对话的整体情况)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:    #(查看一次对话的整体情况)
        summary = await Modules.task_scheduler.get_session_memory_summary(session_id)
        # 如果摘要中包含error,说明会话不存在
        if "error" in summary:
            raise HTTPException(404, summary["error"])
        
        return {
            "success": True,
            "session_id": session_id,
            "memory_summary": summary
        }
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"获取会话记忆摘要失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")

@app.get("/sessions/{session_id}/compressed_memories")
async def get_session_compressed_memories(session_id: str):
    """获取会话的压缩记忆(精简版记忆)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:  # 获取压缩后的记忆
        memories = await Modules.task_scheduler.get_session_compressed_memories(session_id)
        return {
            "success": True,
            "session_id": session_id,
            "compressed_memories": memories,    # 压缩记忆列表
            "count": len(memories)              # 有多少条压缩记忆
        }
    except Exception as e:
        logger.error(f"获取会话压缩记忆失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")
'''什么是压缩记忆?
把很多细节压缩成关键信息,例子:把"打开记事本→输入文字→保存文件"压缩成"创建了一个文本文件"
'''

@app.get("/sessions/{session_id}/key_facts")
async def get_session_key_facts(session_id: str):
    """获取会话的关键事实(最重要的信息)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:   # 获取关键事实
        key_facts = await Modules.task_scheduler.get_session_key_facts(session_id)
        return {
            "success": True,
            "session_id": session_id,
            "key_facts": key_facts,    # 关键事实列表
            "count": len(key_facts)    # 有多少个关键事实
        }
    except Exception as e:
        logger.error(f"获取会话关键事实失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")
'''
关键事实示例:
{
  "key_facts": [
    "用户需要创建一个会议记录文档",
    "用户偏好使用记事本而不是Word",
    "用户经常在下午3点开会"
  ]
}
'''

@app.get("/sessions/{session_id}/failed_attempts")
async def get_session_failed_attempts(session_id: str):
    """获取会话的失败尝试(看看哪里出错了)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:    # 获取失败尝试记录
        failed_attempts = await Modules.task_scheduler.get_session_failed_attempts(session_id)
        return {
            "success": True,
            "session_id": session_id,
            "failed_attempts": failed_attempts,   # 失败记录
            "count": len(failed_attempts)        # 失败次数
        }
    except Exception as e:
        logger.error(f"获取会话失败尝试失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")
#学习价值:从失败中学习,避免重复错误

@app.get("/sessions/{session_id}/tasks")
async def get_session_tasks(session_id: str):
    """获取会话的所有任务(看看这次对话做了哪些事)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:   # 获取会话的所有任务
        tasks = await Modules.task_scheduler.get_session_tasks(session_id)
        return {
            "success": True,
            "session_id": session_id,
            "tasks": tasks,                # 任务列表
            "count": len(tasks)            # 任务数量
        }
    except Exception as e:
        logger.error(f"获取会话任务失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")

@app.delete("/sessions/{session_id}/memory")
async def clear_session_memory(session_id: str):
    """清除指定会话的记忆(删除一次对话的所有记忆)"""
    if not Modules.task_scheduler:
        raise HTTPException(503, "任务调度器未就绪")
    
    try:     # 清除会话记忆
        success = await Modules.task_scheduler.clear_session_memory(session_id)
        if not success:    # 如果返回false,说明会话不存在
            raise HTTPException(404, f"会话 {session_id} 不存在")
        
        return {
            "success": True,
            "message": f"会话 {session_id} 的记忆已清除"
        }
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"清除会话记忆失败: {e}")
        raise HTTPException(500, f"清除失败: {e}")

# ============ 文件编辑工具包API ============

@app.get("/tools")
async def list_tools():
    """列出所有可用的工具 (看看有什么工具)"""
    try:  # 从工具管理器中获取所有工具
        tools = toolkit_manager.get_all_tools()
        return {
            "success": True,
            "tools": tools,        # 工具列表
            "count": len(tools)    # 工具总数
        }
    except Exception as e:
        logger.error(f"获取工具列表失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")
'''
什么是工具?
工具 = 一个具体的功能,比如:"打开浏览器"、"复制文件"、"发送邮件"
示例返回:
{
  "success": true,
  "tools": [
    {"name": "open_browser", "description": "打开浏览器"},
    {"name": "copy_file", "description": "复制文件"},
    {"name": "send_email", "description": "发送邮件"}
  ],
  "count": 3
}
'''

@app.get("/toolkits")
async def list_toolkits():
    """列出所有可用的工具包(看看有哪些工具箱)"""
    try:    # 获取所有工具包
        toolkits = toolkit_manager.list_toolkits()
        return {
            "success": True,
            "toolkits": toolkits,
            "count": len(toolkits)
        }
    except Exception as e:
        logger.error(f"获取工具包列表失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")
'''
工具包 vs 工具:
工具包:一组相关工具的集合(像工具箱),工具:单个具体的功能(像扳手、锤子)
示例:
工具包:file_edit(文件编辑工具包)
├── 工具1:read_file(读文件)
├── 工具2:write_file(写文件)
└── 工具3:edit_file(编辑文件)
'''

@app.get("/toolkits/{toolkit_name}")
async def get_toolkit_info(toolkit_name: str):
    """获取工具包详细信息(打开工具箱看看里面有什么)"""
    try:      # 获取指定工具包的信息
        info = toolkit_manager.get_toolkit_info(toolkit_name)
        if not info:    # 如果没找到,返回404
            raise HTTPException(404, f"工具包不存在: {toolkit_name}")
        
        return {
            "success": True,
            "toolkit": info
        }
    except HTTPException:      # 已知错误(如404)
        raise
    except Exception as e:     # 未知错误
        logger.error(f"获取工具包信息失败: {e}")
        raise HTTPException(500, f"获取失败: {e}")
'''
路径参数:{toolkit_name}
从URL中提取工具包名称,例子:/toolkits/file_edit → toolkit_name = "file_edit"
'''

@app.post("/tools/{toolkit_name}/{tool_name}")
async def call_tool(toolkit_name: str, tool_name: str, arguments: Dict[str, Any]):
    """调用工具(使用具体工具干活)"""
    try:    # 调用工具:指定工具包、工具名和参数
        result = await toolkit_manager.call_tool(toolkit_name, tool_name, arguments)
        return {
            "success": True,
            "toolkit": toolkit_name,
            "tool": tool_name,
            "result": result
        }
    except Exception as e:
        logger.error(f"调用工具失败 {toolkit_name}.{tool_name}: {e}")
        raise HTTPException(500, f"调用失败: {e}")
'''
URL结构分析:
POST /tools/{工具箱}/{工具名}
例子:POST /tools/file_edit/read_file
参数传递:
{
  "path": "/home/user/test.txt"
}
完整调用示例:
POST http://localhost:8000/tools/file_edit/read_file
Content-Type: application/json

{
  "path": "/home/user/test.txt"
}
'''

# ============ 文件编辑专用API ============

@app.post("/file/edit")
async def edit_file(request: Dict[str, str]):
    """编辑文件 - 使用SEARCH/REPLACE格式"""
    try:
        path = request.get("path")     # 文件路径
        diff = request.get("diff")     # 差异内容(要修改的部分)

        # 检查必要参数
        if not path or not diff:
            raise HTTPException(400, "缺少必要参数: path 和 diff")

        # 调用文件编辑工具
        result = await toolkit_manager.call_tool("file_edit", "edit_file", {
            "path": path,
            "diff": diff
        })
        
        return {
            "success": True,
            "result": result
        }
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"编辑文件失败: {e}")
        raise HTTPException(500, f"编辑失败: {e}")
'''
什么是SEARCH/REPLACE格式?
这是一种表示文件修改的方式:
{
  "path": "test.py",
  "diff": "<<<<<<< SEARCH\nprint('Hello')\n=======\nprint('Hello World')\n>>>>>>> REPLACE"
}
解释:
SEARCH部分:找到文件中匹配的内容
REPLACE部分:替换成新内容
就像在Word里用查找替换功能
'''

@app.post("/file/write")
async def write_file(request: Dict[str, str]):
    """写入文件(创建或覆盖文件)"""
    try:
        path = request.get("path")              # 文件路径
        content = request.get("content")        # 文件内容

        # 检查参数,注意content可以是空字符串,所以用content is None判断
        if not path or content is None:
            raise HTTPException(400, "缺少必要参数: path 和 content")

        # 调用写文件工具
        result = await toolkit_manager.call_tool("file_edit", "write_file", {
            "path": path,
            "file_text": content    # 注意参数名是file_text
        })
        
        return {
            "success": True,
            "result": result
        }
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"写入文件失败: {e}")
        raise HTTPException(500, f"写入失败: {e}")
'''
注意细节:
if not path or content is None:
    # 这里用content is None而不是not content
    # 因为content可以是空字符串"",这也是有效的
使用场景:1.创建新文件
        2.覆盖已有文件
        3.清空文件内容(传入空字符串)
'''

@app.get("/file/read")
async def read_file(path: str):
    """读取文件"""
    try:         # 调用读文件工具
        result = await toolkit_manager.call_tool("file_edit", "read_file", {
            "path": path
        })
        
        return {
            "success": True,
            "content": result            # 文件内容
        }
    except Exception as e:
        logger.error(f"读取文件失败: {e}")
        raise HTTPException(500, f"读取失败: {e}")
'''
查询参数:
path:通过URL参数传递
例子:/file/read?path=/home/user/test.txt
返回示例:
{
  "success": true,
  "content": "Hello World!\nThis is a test file."
}
'''

@app.get("/file/list")
async def list_files(directory: str = "."):
    """列出目录文件(查看文件夹内容)"""
    try:
        result = await toolkit_manager.call_tool("file_edit", "list_files", {
            "directory": directory      # 目录路径,默认为当前目录"."
        })
'''
默认值:
directory: str = "."
# 如果不传directory参数,默认是当前目录"."
使用示例:
GET /file/list?directory=/home/user
返回示例:
{
  "success": true,
  "result": {
    "files": ["test.txt", "photo.jpg"],
    "directories": ["Documents", "Downloads"],
    "total": 4
  }
}
'''

        return {
            "success": True,
            "result": result
        }
    except Exception as e:
        logger.error(f"列出文件失败: {e}")
        raise HTTPException(500, f"列出失败: {e}")

#程序启动,启动FastAPI应用
if __name__ == "__main__":
'''
if __name__ == "__main__":
这是一个魔法开关
只有当直接运行这个文件时,下面的代码才会执行
如果被别人导入,就不会执行
'''
    import uvicorn
    #Uvicorn:一个快速的Web服务器
    #专门用来运行FastAPI应用
    from agentserver.config import AGENT_SERVER_PORT
    #从配置文件导入端口号
    #这样可以在配置文件中修改端口,不用改代码
    uvicorn.run(app, host="0.0.0.0", port=AGENT_SERVER_PORT)
    #启动Web服务器
    #app:我们创建的FastAPI应用
    #host="0.0.0.0":监听所有网络接口(可以从任何IP访问)
    #port:使用配置文件中定义的端口
'''
host参数解释:
值                 |  含义        |   谁可以访问
"127.0.0.1"       |  只监听本机    |   只有本机能访问
"0.0.0.0"         |  监听所有IP    |  其他电脑也能访问
"192.168.1.100"   |  监听指定IP    |  只有这个IP能访问
文件操作API汇总:
端点           | 方法    | 用途	       |   参数
/file/edit    | POST   | 编辑文件       |  path, diff
/file/write   | POST   | 写入/创建文件   |  path, content
/file/read    | GET    | 读取文件       |  path(查询参数)
/file/list    | GET    | 列出目录文件    |  directory(查询参数)
与通用工具调用的区别:
方式一:使用通用工具调用
POST /tools/file_edit/read_file
{
  "path": "/home/user/test.txt"
}
方式二:使用专用文件API
GET /file/read?path=/home/user/test.txt
区别:
专用API:更方便,参数直接在URL或固定格式中
通用工具调用:更灵活,可以调用任何工具包的任何工具
'''

# =========完整的API总览==========
#现在我们已经分析完所有API,让我们总结一下:
''' 
1.系统状态类
/health                                健康检查
/computer_control/availability         检查电脑控制能力
2.任务执行类
/schedule                        统一任务调度(主要入口)
/analyze_and_execute             意图分析和执行(兼容旧版)
/computer_control/execute        直接执行电脑控制任务
3.记忆管理类
/tasks                             获取任务列表
/tasks/{task_id}                   获取任务状态
/tasks/{task_id}/memory            获取任务记忆
/tasks/{task_id}/steps             添加任务步骤
/memory/global                     获取全局记忆
/sessions                          获取所有会话
/sessions/{session_id}/memory      获取会话记忆
4. 工具管理类
/tools                      列出所有工具
/toolkits                   列出所有工具包
/toolkits/{toolkit_name}    获取工具包详情
/tools/{toolkit}/{tool}     调用工具
5. 文件操作类
/file/edit             编辑文件
/file/write            写入文件
/file/read             读取文件
/file/list             列出目录文件
'''
#一、文件架构
'''
naga_agent.py(主文件)
├── 导入部分(15个导入)
├── 配置部分(日志配置)
├── 核心类(Modules)
├── 生命周期管理器(lifespan)
├── FastAPI应用实例(app)
├── 7个核心功能函数
├── 25个API端点函数
└── 启动代码
'''
#二、核心类结构
#1. Modules类(全局工具箱)
'''
class Modules:
    analyzer = None          # 意图分析器
    computer_control = None  # 电脑控制器
    task_scheduler = None    # 任务调度器
'''
#三、核心函数(7个)
#1. 基础工具函数
#def _now_iso() -> str:
    # 获取当前时间(ISO格式)
#2. 任务执行函数(3个)
'''
async def _process_computer_control_task(...):
    # 执行单个电脑控制任务

async def _execute_agent_tasks_async(...):
    # 异步执行多个Agent任务
    
async def _send_callback_notification(...):
    # 发送回调通知
'''
#3. 生命周期函数(1个)
'''
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 程序启动和关闭管理
'''
#四、API端点(25个,按功能分组)
#第1组:系统状态(2个)
'''
GET  /health                      # 健康检查
GET  /computer_control/availability  # 检查电脑控制能力
'''
#第2组:任务执行(3个)
'''
POST /schedule                    # 统一任务调度(主入口)
POST /analyze_and_execute         # 意图分析和执行(兼容旧版)
POST /computer_control/execute    # 直接执行电脑任务
'''
#第3组:任务记忆管理(6个)
'''
GET    /tasks                     # 获取所有任务
GET    /tasks/{task_id}           # 获取特定任务状态
GET    /tasks/{task_id}/memory    # 获取任务记忆
POST   /tasks/{task_id}/steps     # 添加任务步骤
DELETE /tasks/{task_id}/memory    # 清除任务记忆
DELETE /memory/global             # 清除全局记忆
'''
#第4组:会话记忆管理(8个)
'''
GET    /sessions                  # 获取所有会话
GET    /sessions/{session_id}/memory      # 获取会话记忆
GET    /sessions/{session_id}/compressed_memories  # 压缩记忆
GET    /sessions/{session_id}/key_facts   # 关键事实
GET    /sessions/{session_id}/failed_attempts  # 失败尝试
GET    /sessions/{session_id}/tasks       # 会话的所有任务
DELETE /sessions/{session_id}/memory      # 清除会话记忆
'''
#第5组:工具管理(4个)
'''
GET  /tools                      # 列出所有工具
GET  /toolkits                   # 列出所有工具包
GET  /toolkits/{toolkit_name}    # 获取工具包详情
POST /tools/{toolkit}/{tool}     # 调用工具
'''
#第6组:文件操作(4个)
'''
POST /file/edit                  # 编辑文件
POST /file/write                 # 写入文件
GET  /file/read                  # 读取文件
GET  /file/list                  # 列出目录
'''
#五、数据结构层级
#1. 最高层:程序
'''
NagaAgent服务
    ↓
会话(Session)
    ↓
任务(Task)
    ↓
步骤(Step)
'''
#2. 会话(Session)结构
'''
会话 = {
    "session_id": "用户123",           # 会话ID
    "tasks": [任务1, 任务2, ...],      # 包含的任务
    "memory": 记忆数据,               # 会话记忆
    "key_facts": 关键事实列表         # 重要信息
}
'''
#3. 任务(Task)结构
'''
任务 = {
    "task_id": "任务001",             # 任务ID
    "purpose": "打开记事本",           # 任务目的
    "status": "running",              # 任务状态
    "steps": [步骤1, 步骤2, ...],     # 任务步骤
    "memory": 任务记忆                 # 任务记忆
}
'''
#4. 步骤(Step)结构
'''
步骤 = {
    "step_id": "步骤001",             # 步骤ID
    "purpose": "打开记事本",           # 步骤目的
    "content": "调用记事本程序",       # 步骤内容
    "output": "记事本已打开",          # 步骤输出
    "success": True,                  # 是否成功
    "error": None                     # 错误信息
}
'''
#六、执行流程
#完整任务执行流程:
#用户请求 → /schedule端点 → 创建任务 → 异步执行 → 处理每个Agent任务 → 调用电脑控制器 → 记录步骤 → 发送回调 → 更新记忆
#1. 程序元信息
'''
程序元信息
├── 解释器声明:#!/usr/bin/env python3
└── 编码声明:# -*- coding: utf-8 -*-
'''
#2. 导入模块系统
'''
导入模块系统
├── Python标准库(5个)
│   ├── asyncio     # 异步处理
│   ├── uuid        # 生成唯一ID
│   ├── logging     # 日志记录
│   ├── typing      # 类型注解
│   └── datetime    # 时间处理
│
├── Web框架(2个)
│   ├── fastapi.FastAPI        # Web应用框架
│   └── fastapi.HTTPException  # HTTP异常
│
├── 辅助工具(2个)
│   ├── fastapi.responses.JSONResponse  # JSON响应
│   └── contextlib.asynccontextmanager  # 异步上下文管理器
│
└── 自定义模块(6个)
    ├── system.config                    # 配置文件
    ├── system.background_analyzer       # 意图分析器
    ├── agentserver.agent_computer_control  # 电脑控制智能体
    ├── agentserver.task_scheduler       # 任务调度器
    ├── agentserver.toolkit_manager      # 工具管理器
    └── agentserver.config               # 服务器配置
'''
#3. 日志配置系统
'''
日志配置系统
├── 创建日志器:logging.getLogger(__name__)
├── 设置日志级别:logger.setLevel(logging.INFO)
└── 配置处理器(如果没有的话)
    ├── 创建控制台处理器:logging.StreamHandler()
    ├── 设置日志格式:时间-名称-级别-消息
    └── 添加处理器:logger.addHandler(handler)
'''
#4. 生命周期管理器
'''
生命周期管理器(lifespan函数)
├── 启动阶段(startup)
│   ├── 初始化意图分析器:get_background_analyzer()
│   ├── 初始化电脑控制智能体:ComputerControlAgent()
│   ├── 初始化任务调度器:get_task_scheduler()
│   └── 设置LLM配置(如果存在)
│
├── 运行阶段(yield)
└── 关闭阶段(shutdown)
    └── 记录关闭日志
'''
#5. 全局模块管理器
'''
全局模块管理器(Modules类)
├── 意图分析器:analyzer = None
├── 电脑控制器:computer_control = None
└── 任务调度器:task_scheduler = None
'''
# 6.内部工具函数
'''
内部工具函数
└── 时间工具
    └── def _now_iso() -> str
        ├── 功能:获取当前ISO格式时间
        └── 返回:YYYY-MM-DDTHH:MM:SS格式字符串
'''
#7. 核心处理函数
'''
核心处理函数
├── 单个任务处理器
│   └── async def _process_computer_control_task(...)
│       ├── 功能:处理单个电脑控制任务
│       ├── 输入:指令和会话ID
│       ├── 流程:
│       │   ├── 记录开始日志
│       │   ├── 调用电脑控制器执行
│       │   ├── 记录完成日志
│       │   └── 返回结果
│       └── 输出:成功/失败结果
│
├── 批量任务执行器
│   └── async def _execute_agent_tasks_async(...)
│       ├── 功能:异步执行多个Agent任务
│       ├── 输入:任务列表、会话信息等
│       ├── 流程:
│       │   ├── 循环处理每个任务
│       │   ├── 为每个任务添加步骤记录
│       │   ├── 执行电脑控制任务
│       │   ├── 更新步骤结果
│       │   └── 发送回调通知
│       └── 输出:无(异步执行)
│
└── 回调通知器
    └── async def _send_callback_notification(...)
        ├── 功能:发送任务完成回调通知
        ├── 输入:回调URL、任务结果等
        ├── 流程:
        │   ├── 构建回调数据
        │   ├── 发送HTTP POST请求
        │   └── 处理响应
        └── 输出:无
'''
#8. API端点系统(25个端点)
#8.1 系统状态检查
'''
系统状态检查
├── GET /health
│   ├── 功能:健康检查
│   └── 返回:服务状态和各模块就绪情况
│
└── GET /computer_control/availability
    ├── 功能:获取电脑控制可用性
    └── 返回:电脑控制能力和状态
'''
#8.2 任务执行端点
'''
任务执行端点
├── POST /schedule
│   ├── 功能:统一任务调度(主入口)
│   ├── 输入:查询、任务列表、会话信息等
│   ├── 流程:
│   │   ├── 检查调度器就绪
│   │   ├── 创建任务记录
│   │   ├── 异步执行任务
│   │   └── 立即返回接受响应
│   └── 返回:任务ID和调度状态
│
├── POST /analyze_and_execute
│   ├── 功能:意图分析和执行(兼容旧版)
│   ├── 输入:消息列表和会话ID
│   ├── 流程:
│   │   ├── 从消息中提取任务指令
│   │   ├── 同步执行每个任务
│   │   └── 等待所有任务完成
│   └── 返回:执行结果列表
│
└── POST /computer_control/execute
    ├── 功能:直接执行电脑控制任务
    ├── 输入:指令字符串
    └── 返回:执行结果
'''
#8.3 任务记忆管理
'''
任务记忆管理
├── 任务级别(Task Level)
│   ├── GET /tasks
│   │   ├── 功能:获取所有任务列表
│   │   └── 返回:正在运行的任务列表
│   │
│   ├── GET /tasks/{task_id}
│   │   ├── 功能:获取指定任务状态
│   │   └── 返回:任务详细状态
│   │
│   ├── GET /tasks/{task_id}/memory
│   │   ├── 功能:获取任务记忆摘要
│   │   └── 返回:任务记忆信息
│   │
│   ├── POST /tasks/{task_id}/steps
│   │   ├── 功能:添加任务步骤
│   │   └── 返回:步骤添加结果
│   │
│   └── DELETE /tasks/{task_id}/memory
│       ├── 功能:清除任务记忆
│       └── 返回:清除结果
│
├── 会话级别(Session Level)
│   ├── GET /sessions
│   │   ├── 功能:获取所有会话摘要
│   │   └── 返回:会话列表和总数
│   │
│   ├── GET /sessions/{session_id}/memory
│   │   ├── 功能:获取会话记忆摘要
│   │   └── 返回:会话记忆信息
│   │
│   ├── GET /sessions/{session_id}/compressed_memories
│   │   ├── 功能:获取会话压缩记忆
│   │   └── 返回:压缩记忆列表
│   │
│   ├── GET /sessions/{session_id}/key_facts
│   │   ├── 功能:获取会话关键事实
│   │   └── 返回:关键事实列表
│   │
│   ├── GET /sessions/{session_id}/failed_attempts
│   │   ├── 功能:获取会话失败尝试
│   │   └── 返回:失败记录列表
│   │
│   ├── GET /sessions/{session_id}/tasks
│   │   ├── 功能:获取会话的所有任务
│   │   └── 返回:任务列表
│   │
│   └── DELETE /sessions/{session_id}/memory
│       ├── 功能:清除指定会话记忆
│       └── 返回:清除结果
│
└── 全局级别(Global Level)
    ├── GET /memory/global
    │   ├── 功能:获取全局记忆摘要
    │   └── 返回:全局记忆和失败尝试摘要
    │
    └── DELETE /memory/global
        ├── 功能:清除全局记忆
        └── 返回:清除结果
'''
#8.4 工具管理系统
'''
工具管理系统
├── GET /tools
│   ├── 功能:列出所有可用工具
│   └── 返回:工具列表和总数
│
├── GET /toolkits
│   ├── 功能:列出所有可用工具包
│   └── 返回:工具包列表和总数
│
├── GET /toolkits/{toolkit_name}
│   ├── 功能:获取工具包详细信息
│   └── 返回:工具包详情
│
└── POST /tools/{toolkit_name}/{tool_name}
    ├── 功能:调用指定工具
    └── 返回:工具执行结果
'''
#8.5 文件操作系统
'''
文件操作系统
├── POST /file/edit
│   ├── 功能:编辑文件(SEARCH/REPLACE格式)
│   └── 返回:编辑结果
│
├── POST /file/write
│   ├── 功能:写入文件
│   └── 返回:写入结果
│
├── GET /file/read
│   ├── 功能:读取文件
│   └── 返回:文件内容
│
└── GET /file/list
    ├── 功能:列出目录文件
    └── 返回:文件和目录列表
'''
#9. 程序启动入口
'''
程序启动入口
├── 条件判断:if __name__ == "__main__"
├── 导入uvicorn服务器
├── 导入端口配置
└── 启动Web服务器
    ├── 服务器:uvicorn
    ├── 应用:app
    ├── 主机:0.0.0.0(所有网络接口)
    └── 端口:从配置读取
'''
#三、数据流层级结构
#请求处理流程
'''
外部请求
  ↓
FastAPI路由
  ↓
API端点函数
  ├── 参数验证
  ├── 权限检查
  ├── 调用内部函数
  └── 返回响应
    ↓
内部处理函数
  ├── 调用全局模块
  ├── 执行具体操作
  └── 记录日志和记忆
    ↓
全局模块
  ├── 意图分析器(理解用户意图)
  ├── 电脑控制器(执行电脑操作)
  └── 任务调度器(管理任务流程)
    ↓
底层系统
  ├── 文件系统(读写文件)
  ├── 操作系统(控制电脑)
  └── 网络系统(发送回调)
'''
#记忆系统层级
'''
记忆系统层级
├── 步骤级别(Step Level)
│   ├── 单个操作记录
│   ├── 执行结果
│   └── 错误信息
│
├── 任务级别(Task Level)
│   ├── 任务目的
│   ├── 所有步骤记录
│   ├── 任务状态
│   └── 任务结果
│
├── 会话级别(Session Level)
│   ├── 用户会话上下文
│   ├── 所有任务记录
│   ├── 压缩记忆(精简版)
│   ├── 关键事实(重要信息)
│   └── 失败尝试记录
│
└── 全局级别(Global Level)
    ├── 所有会话摘要
    ├── 系统总体统计
    └── 全局失败记录
'''
#四、错误处理层级
'''
错误处理结构
├── HTTP异常(HTTPException)
│   ├── 400 Bad Request(客户端错误)
│   ├── 404 Not Found(资源不存在)
│   ├── 503 Service Unavailable(服务不可用)
│   └── 500 Internal Server Error(服务器内部错误)
│
├── 通用异常处理
│   ├── try-except块
│   ├── 记录错误日志
│   └── 返回统一错误格式
│
└── 特定异常处理
    ├── 已知错误:直接抛出HTTPException
    └── 未知错误:捕获并转换为500错误
'''
#五、配置层级结构
'''
配置系统
├── 日志配置
│   ├── 日志级别:INFO
│   ├── 输出格式
│   └── 输出目标
│
├── 服务器配置
│   └── 端口号:从agentserver.config导入
│
├── AI模型配置
│   ├── 模型名称
│   ├── API密钥
│   └── 基础URL
│
└── 模块配置
    ├── 意图分析器配置
    ├── 电脑控制器配置
    └── 任务调度器配置
'''

agent_manager.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Agent管理器 - 独立的Agent注册和调用系统
支持从配置文件动态加载Agent定义,提供统一的调用接口
第1行:告诉电脑"这个文件要用python3来运行"
第2行:告诉电脑"这个文件里的文字使用UTF-8编码"(这样才能正确显示中文)
"""

import os          # 用来操作文件和文件夹
import json        # 用来处理JSON格式的数据
import asyncio     # 用来处理"异步"操作(可以同时做多件事)
import logging     # 用来记录程序运行时的信息
import time        # 用来处理时间相关的操作
from pathlib import Path  # 更好的处理文件路径
from typing import Dict, Any, Optional, List  # 给代码添加类型提示,让代码更清晰
from dataclasses import dataclass, field  # 创建数据类,简化代码
from datetime import datetime, timedelta  # 处理日期和时间
import re  # 处理文本匹配(正则表达式)
#这些就像工具箱里的不同工具,每个工具都有专门的用途。

# 配置日志
logging.basicConfig(level=logging.INFO)    # 设置记录"INFO"级别及以上的信息
logger = logging.getLogger("AgentManager")   # 创建一个叫"AgentManager"的日志记录器

# 屏蔽HTTP库的DEBUG日志
logging.getLogger("httpcore.http11").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore.connection").setLevel(logging.WARNING)
'''
这是什么意思?
日志就像是程序的"日记本"
这里设置只记录重要程度为"INFO"及以上的信息
后面的三行是告诉程序:"不要记录HTTP网络请求的详细信息"
'''

'''
@dataclass是一个"装饰器",它告诉Python:"这个类是用来存储数据的"
加了@dataclass后,这个类会自动生成一些有用的方法
让你不需要写很多重复的代码

dataclass 自动生成的功能:
自动生成__init__()方法(初始化方法)
自动生成__repr__()方法(漂亮的打印格式)
自动生成比较方法(比如==比较)
'''
@dataclass
class AgentConfig:
    """Agent配置类"""
    id: str  # 模型ID
    name: str  # Agent名称(中文名)
    base_name: str  # 基础名称(英文)
    system_prompt: str  # 系统提示词
    max_output_tokens: int = 40000  # 最大输出token数
    temperature: float = 0.7  # 温度参数 
    description: str = ""  # 描述信息
    model_provider: str = "openai"  # 模型提供商
    api_base_url: str = ""  # API基础URL
    api_key: str = ""  # API密钥

@dataclass
class AgentSession:
    """Agent会话类"""
    timestamp: float = field(default_factory=time.time)
    #timestamp: float 表示这个字段是浮点数(带小数点的数字),float是Python中的浮点数类型
    '''默认值设置:
    = field(default_factory=time.time) 这是特殊的设置
    field()是dataclasses模块的函数
    default_factory=time.time意思是:"如果不指定值,就调用time.time()函数来获取当前时间"
    
    为什么用 default_factory?
    # 如果用普通的默认值:
    timestamp: float = time.time()  # 错误!这样会在导入时固定时间

     # 用 default_factory:
    timestamp: float = field(default_factory=time.time)  # 正确!每次创建对象时才调用
    '''
    history: List[Dict[str, str]] = field(default_factory=list)
    '''
    类型注解解释:
    List[Dict[str, str]]
    ├── List: 列表类型
    └── Dict[str, str]: 字典类型,键和值都是字符串
    这是什么意思?
      这是一个列表
      列表里的每个元素都是一个字典
      字典有两个字符串键值对
    '''
    session_id: str = "default_user_session"   #类型是str(字符串),默认值是"default_user_session"

class AgentManager:
    """Agent管理器 - 增强版,支持任务规划和多智能体协作"""
    
    def __init__(self, config_dir: str = None):
        """
        初始化Agent管理器
        
        Args:
            config_dir: Agent配置文件目录(可选,MCP架构下通常不需要)
        """
        self.config_dir = Path(config_dir) if config_dir else None
        self.agents: Dict[str, AgentConfig] = {}
        self.agent_sessions: Dict[str, Dict[str, AgentSession]] = {}
        '''
        初始化了什么?
            config_dir: 配置文件的位置(可能没有)
            agents: 空的字典,用来存放所有助手的配置信息
            agent_sessions: 空的字典,用来存放所有助手的对话记录
        
        重要:
          AgentSession在后面会定义,用来记录对话历史
          这是三层嵌套的字典:
             第一层:助手名字 → 第二层字典
             第二层:对话ID → 对话记录
             第三层:具体的对话记录
        '''
        
        # 任务规划相关 - 现在通过agentserver的task_scheduler处理
        # self.task_planner = None  # 删除重复功能
        # self.task_executor = None  # 删除重复功能
        
        # 从配置文件读取最大历史轮数
        try:
            from system.config import config, AI_NAME
            self.max_history_rounds = config.api.max_history_rounds
        except ImportError:
            self.max_history_rounds = 10  # 默认值
            logger.warning("无法导入配置,使用默认历史轮数设置")
            '''
            这是什么意思?
               尝试从另一个配置文件读取设置
               如果找不到那个文件,就用默认值(10轮对话历史)
               max_history_rounds:最多保留多少轮对话记录
            '''
        self.context_ttl_hours = 24  # 上下文TTL(小时)  #context_ttl_hours = 24:对话记录保存24小时
        self.debug_mode = True    # debug_mode = True:开启调试模式(会记录更多信息)
        
        # 条件加载配置
        # 只在指定了config_dir时才创建目录和加载配置
        if self.config_dir:
            # 确保配置目录存在
            self.config_dir.mkdir(exist_ok=True)
            # 加载Agent配置
            self._load_agent_configs()
        else:
            logger.info("AgentManager使用MCP架构,跳过外部配置文件加载")
            '''
            逻辑判断:
                如果提供了配置文件目录:创建目录并加载配置
                否则:跳过加载,使用MCP架构(另一种管理方式)
            '''
            
        
        # 启动定期清理任务(只在事件循环中启动)
        try:
            loop = asyncio.get_running_loop()
            asyncio.create_task(self._periodic_cleanup())
        except RuntimeError:
            # 没有运行的事件循环,跳过定期清理任务
            pass
        '''
        这是什么?
           启动一个"自动清理工"
           这个清理工会每小时检查一次,删除超过24小时的旧对话记录
           try...except:尝试启动,失败了也没关系
        '''
        
        # 任务规划器现在通过agentserver的task_scheduler处理
        # self._init_task_planner()  # 删除重复功能
        
        logger.info(f"AgentManager初始化完成,已加载 {len(self.agents)} 个Agent")
    
    def _load_agent_configs(self):  #从JSON配置文件中加载所有助手的设置
        """从配置文件加载Agent定义"""
        # 检查config_dir是否存在
        if not self.config_dir:    
            logger.info("未指定配置文件目录,跳过Agent配置加载")
            return
            
        # 扫描配置文件目录
        config_files = list(self.config_dir.glob("*.json")) #glob("*.json"):找出这个文件夹里所有.json文件
        
        for config_file in config_files:   #打开一个JSON文件
            try:
                with open(config_file, 'r', encoding='utf-8') as f:
                    config_data = json.load(f)  #打开一个JSON文件,config_data里就保存了文件的所有内容
                
                # 解析Agent配置
                for agent_key, agent_data in config_data.items():
                    if self._validate_agent_config(agent_data):
                        agent_config = AgentConfig(
                            id=agent_data.get('model_id', ''),
                            name=agent_data.get('name', agent_key),
                            base_name=agent_data.get('base_name', agent_key),
                            system_prompt=agent_data.get('system_prompt', f'You are a helpful AI assistant named {agent_data.get("name", agent_key)}.'),
                            max_output_tokens=agent_data.get('max_output_tokens', 40000),
                            temperature=agent_data.get('temperature', 0.7),
                            description=agent_data.get('description', f'Assistant {agent_data.get("name", agent_key)}.'),
                            model_provider=agent_data.get('model_provider', 'openai'),
                            api_base_url=agent_data.get('api_base_url', ''),
                            api_key=agent_data.get('api_key', '')
                        )
                        
                        self.agents[agent_key] = agent_config
                        '''
                        这个过程:
                            检查配置是否有效(_validate_agent_config)
                            用配置数据创建AgentConfig对象
                            把这个对象存到self.agents字典里
                       .get()方法的好处:
                            agent_data.get('name', agent_key)意思是:
                            如果配置里有name字段,就用它
                            如果没有,就用agent_key(键名)代替
                        '''
                        logger.info(f"已加载Agent: {agent_key} ({agent_config.name})")
                
            except Exception as e:
                logger.error(f"加载配置文件 {config_file} 失败: {e}")
    
    #验证配置的方法
    def _validate_agent_config(self, config: Dict[str, Any]) -> bool:
        """验证Agent配置"""
        required_fields = ['model_id', 'name']#必须要有model_id和name这两个字段,如果缺少任何一个,就返回False(不通过)
        for field in required_fields:
            if field not in config or not config[field]:
                logger.warning(f"Agent配置缺少必需字段: {field}")
                return False
        return True
    
    def get_agent_session_history(self, agent_name: str, session_id: str = 'default_user_session') -> List[Dict[str, str]]:
        """获取Agent会话历史"""
        if agent_name not in self.agent_sessions:
            self.agent_sessions[agent_name] = {}
        
        agent_sessions = self.agent_sessions[agent_name]
        if session_id not in agent_sessions or self._is_context_expired(agent_sessions[session_id].timestamp):
            agent_sessions[session_id] = AgentSession(session_id=session_id)
        
        return agent_sessions[session_id].history
    '''
    这个方法做什么?
         获取某个助手、某个对话的历史记录。
    逻辑流程:
        如果这个助手没有对话记录 → 创建一个空的记录字典
        如果对话记录不存在或已过期 → 创建新的对话记录
        返回对话历史
    '''
    
    def update_agent_session_history(self, agent_name: str, user_message: str, assistant_message: str, session_id: str = 'default_user_session'):
        """更新Agent会话历史,把新的一轮对话添加到历史记录中。"""
        if agent_name not in self.agent_sessions:
            self.agent_sessions[agent_name] = {}
        
        agent_sessions = self.agent_sessions[agent_name]
        if session_id not in agent_sessions or self._is_context_expired(agent_sessions[session_id].timestamp):
            agent_sessions[session_id] = AgentSession(session_id=session_id)
        
        session_data = agent_sessions[session_id]
        session_data.history.extend([     #每次对话包含两条记录:
            {"role": "user", "content": user_message},         #用户说的话
            {"role": "assistant", "content": assistant_message}   #助手说的话
        ])
        session_data.timestamp = time.time()
        
        # 限制历史消息数量
        max_messages = self.max_history_rounds * 2
        if len(session_data.history) > max_messages:
            session_data.history = session_data.history[-max_messages:]  #[-max_messages:]:取最后max_messages条记录
    
    def _is_context_expired(self, timestamp: float) -> bool:
        """检查上下文是否过期"""
        return (time.time() - timestamp) > (self.context_ttl_hours * 3600)
    '''
    计算逻辑:
       time.time():现在的时间(秒数)
       timestamp:上次对话的时间(秒数)
       两者相减得到过了多少秒
       如果超过24小时(24×3600秒)→ 返回True(已过期)
    '''
    
    async def _periodic_cleanup(self):
        """定期清理过期的会话上下文"""
        while True:
            try:
                await asyncio.sleep(3600)  # 每小时清理一次
                
                if self.debug_mode:
                    logger.debug("执行定期上下文清理...")
                    '''
                    这是一个无限循环:
                       等待1小时(3600秒)
                       执行清理
                       回到第1步
                    '''
                
                for agent_name, sessions in list(self.agent_sessions.items()):#list()的作用:创建列表副本,避免在循环中修改字典出错
                    for session_id, session_data in list(sessions.items()):
                        if self._is_context_expired(session_data.timestamp):
                            sessions.pop(session_id, None)
                            if self.debug_mode:
                                logger.debug(f"清理过期上下文: {agent_name}, session {session_id}")
                    
                    if not sessions:
                        self.agent_sessions.pop(agent_name, None)
                        
            except Exception as e:
                logger.error(f"定期清理任务出错: {e}")
    
    def _replace_placeholders(self, text: str, agent_config: AgentConfig) -> str:
        """替换提示词中的占位符,支持Agent配置和环境变量"""
        if not text:
            return ""
        
        processed_text = str(text)#把文本中的"模板变量"替换成真实的值,把{{AgentName}} 替换成 "小助手",把{{CurrentTime}} 替换成 "14:30:00"
        
        # Agent配置相关的占位符替换
        if agent_config:
            # 基础Agent信息
            processed_text = processed_text.replace("{{AgentName}}", agent_config.name)
            processed_text = processed_text.replace("{{MaidName}}", agent_config.name)
            processed_text = processed_text.replace("{{BaseName}}", agent_config.base_name)
            processed_text = processed_text.replace("{{Description}}", agent_config.description)
            processed_text = processed_text.replace("{{ModelId}}", agent_config.id)
            
            # 配置参数
            processed_text = processed_text.replace("{{Temperature}}", str(agent_config.temperature))
            processed_text = processed_text.replace("{{MaxTokens}}", str(agent_config.max_output_tokens))
            processed_text = processed_text.replace("{{ModelProvider}}", agent_config.model_provider)
            '''
            可以替换的占位符:
               {{AgentName}} / {{MaidName}} → 助手的中文名
               {{BaseName}} → 助手的英文名
               {{Description}} → 助手的描述
               {{ModelId}} → 模型ID
               {{Temperature}} → 温度参数
               {{MaxTokens}} → 最大输出字数
               {{ModelProvider}} → 模型提供商
            '''
        
        # 环境变量占位符替换
        import os
        import re
        
        # 匹配 {{ENV_VAR_NAME}} 格式的环境变量,环境变量是电脑系统里的设置,比如API_KEY="sk-abc123",{{API_KEY}}会被替换成"sk-abc123"
        env_pattern = r'\{\{([A-Z_][A-Z0-9_]*)\}\}'  #r'\{\{([A-Z_][A-Z0-9_]*)\}\}' 匹配 {{大写字母_数字}},比如:{{API_KEY}}、{{DATABASE_URL}}
        for match in re.finditer(env_pattern, processed_text):
            env_var_name = match.group(1)
            env_value = os.getenv(env_var_name, '')
            processed_text = processed_text.replace(f"{{{{{env_var_name}}}}}", env_value)
        
        # 时间相关占位符
        from datetime import datetime
        now = datetime.now()
        processed_text = processed_text.replace("{{CurrentTime}}", now.strftime("%H:%M:%S"))
        processed_text = processed_text.replace("{{CurrentDate}}", now.strftime("%Y-%m-%d"))
        processed_text = processed_text.replace("{{CurrentDateTime}}", now.strftime("%Y-%m-%d %H:%M:%S"))
        '''
        时间格式:
          {{CurrentTime}} → "14:30:00"(小时:分钟:秒)
          {{CurrentDate}} → "2024-01-01"(年-月-日)
          {{CurrentDateTime}} → "2024-01-01 14:30:00"(完整时间)
        '''
        
        return processed_text
    
    def _build_system_message(self, agent_config: AgentConfig) -> Dict[str, str]:
        """构建系统消息,包含Agent的身份、行为、风格等"""
        # 处理系统提示词中的占位符
        processed_system_prompt = self._replace_placeholders(agent_config.system_prompt, agent_config)
        
        return {
            "role": "system",
            "content": processed_system_prompt
        }
        '''
        这是什么?系统消息告诉AI助手"你是谁",比如:
        {
              "role": "system",
              "content": "你是一个帮助人的AI助手,名字叫小助手。"
        }
        '''
    
    def _build_user_message(self, prompt: str, agent_config: AgentConfig) -> Dict[str, str]:
        """构建用户消息,处理用户输入"""
        # 处理用户提示词中的占位符
        processed_prompt = self._replace_placeholders(prompt, agent_config)
        
        return {
            "role": "user",
            "content": processed_prompt
        }
        '''
        用户消息的例子:
        {
            "role": "user",
            "content": "你好,现在时间是{{CurrentTime}}"
        }
        '''
    
    def _build_assistant_message(self, content: str) -> Dict[str, str]:
        """构建助手消息"""
        return {
            "role": "assistant",
            "content": content
        }
       '''
       助手消息的例子:
       {
            "role": "assistant",
            "content": "你好!现在是14:30:00。"
       }
       '''
    
    def _validate_messages(self, messages: List[Dict[str, str]]) -> bool:
        """验证消息序列的有效性"""
        if not messages:  #检查消息是否为空
            return False
        
        # 检查消息格式
        for msg in messages:
            if not isinstance(msg, dict):
                return False
            if 'role' not in msg or 'content' not in msg:
                return False
            if msg['role'] not in ['system', 'user', 'assistant']:
                return False
            if not isinstance(msg['content'], str):
                return False
        
        # 检查系统消息是否在开头
        if messages[0]['role'] != 'system':
            return False
        
        return True
       '''
       为什么需要系统消息在第一?
          这是OpenAI API的要求
          系统消息最先告诉AI"你的角色是什么"
       '''
 
  #call_agent 方法,这是整个文件最重要的方法,用来调用AI助手。
    async def call_agent(self, agent_name: str, prompt: str, session_id: str = None) -> Dict[str, Any]:
        """
        调用指定的Agent
        
        Args:
            agent_name: Agent名称
            prompt: 用户提示词
            session_id: 会话ID
            
        Returns:
            Dict[str, Any]: 调用结果
        """
        # 检查Agent是否存在
        if agent_name not in self.agents:
            available_agents = list(self.agents.keys())
            error_msg = f"请求的Agent '{agent_name}' 未找到或未正确配置。"  #如果要找的助手不存在 → 返回错误信息
            if available_agents:  #错误信息里会告诉你哪些助手可用
                error_msg += f" 当前已加载的Agent有: {', '.join(available_agents)}。"
            else:
                error_msg += " 当前没有加载任何Agent。请检查配置文件。"
            error_msg += " 请确认您请求的Agent名称是否准确。"
            
            logger.error(f"Agent调用失败: {error_msg}")
            return {"status": "error", "error": error_msg}
        
        agent_config = self.agents[agent_name]
        
        # 生成会话ID
        if not session_id:
            session_id = f"agent_{agent_config.base_name}_default_user_session"
        
        try:  #构建完整的消息序列
            # 获取会话历史
            history = self.get_agent_session_history(agent_name, session_id)
            
            # 构建完整的消息序列
            messages = []
            
            # 1. 系统消息:设定Agent的身份、行为、风格等
            system_message = self._build_system_message(agent_config)
            messages.append(system_message)
            
            # 2. 历史消息:保留多轮对话的上下文
            messages.extend(history)
            
            # 3. 当前用户输入:本次要处理的任务内容
            user_message = self._build_user_message(prompt, agent_config)
            messages.append(user_message)
            '''
            消息序列的组成:
            [
                {系统消息: "你是助手A"},      # 第一条必须是系统消息
                {用户: "你好"},             # 历史对话的第一轮
                {助手: "你好!"},           # 历史对话的第一轮
                {用户: "今天天气怎么样"},    # 历史对话的第二轮
                {助手: "今天晴天"},         # 历史对话的第二轮
                {用户: "现在是什么时间"}     # 当前的新问题
            ]
            '''
            
            # 验证消息序列
            if not self._validate_messages(messages):
                return {"status": "error", "error": "消息序列格式无效"}
            
            # 记录调试信息
            if self.debug_mode:
                logger.debug(f"Agent调用消息序列:")
                for i, msg in enumerate(messages):
                    logger.debug(f"  [{i}] {msg['role']}: {msg['content'][:100]}...")
            
            # 调用LLM API
            response = await self._call_llm_api(agent_config, messages)
            
            if response.get("status") == "success":
                assistant_response = response.get("result", "")
                
                # 更新会话历史
                self.update_agent_session_history(
                    agent_name, user_message['content'], assistant_response, session_id
                )
                
                return {"status": "success", "result": assistant_response}
            else:
                return response
            '''
            处理流程:
               调用AI模型API
               如果成功 → 保存对话历史 → 返回结果
               如果失败 → 直接返回错误
            '''
                
        except Exception as e:
            error_msg = f"调用Agent '{agent_name}' 时发生错误: {str(e)}"
            logger.error(f"Agent调用异常: {error_msg}")
            return {"status": "error", "error": error_msg}
    
    # 调用LLM API的核心方法
    async def _call_llm_api(self, agent_config: AgentConfig, messages: List[Dict[str, str]]) -> Dict[str, Any]:
        """调用LLM API,使用Agent配置中的参数"""
        try:
            # 使用新版本的OpenAI API
            from openai import AsyncOpenAI
            
            # 记录调试信息
            if self.debug_mode:
                logger.debug(f"调用LLM API - Agent: {agent_config.name}")
                logger.debug(f"  模型: {agent_config.id}")
                logger.debug(f"  提供商: {agent_config.model_provider}")
                logger.debug(f"  API URL: {agent_config.api_base_url}")
                logger.debug(f"  温度: {agent_config.temperature}")
                logger.debug(f"  最大Token: {agent_config.max_output_tokens}")
                logger.debug(f"  消息数量: {len(messages)}")
            
            # 验证必要的配置参数
            if not agent_config.id:
                return {"status": "error", "error": "Agent配置缺少模型ID"}
            
            if not agent_config.api_key:
                return {"status": "error", "error": "Agent配置缺少API密钥"}
            
            # 创建客户端,使用Agent配置中的参数
            client = AsyncOpenAI(
                api_key=agent_config.api_key,   #api_key: 访问API的密码
                #base_url: API的网址,如果没配置,默认用DeepSeek的API
                base_url=agent_config.api_base_url or "https://api.deepseek.com/v1"
            )
            
            # 准备API调用参数
            api_params = {
                "model": agent_config.id,  #model: 使用哪个AI模型
                "messages": messages,     #messages: 对话消息列表
                "max_tokens": agent_config.max_output_tokens,  #max_tokens: 最多生成多少字
                "temperature": agent_config.temperature,   #temperature: 创造力参数(0.7是中等创造力)
                "stream": False   #stream: False: 不要流式传输(一次性返回结果)
            }
            
            # 记录API调用参数(调试模式)
            if self.debug_mode:
                logger.debug(f"API调用参数: {api_params}")
            
            # 调用API
            response = await client.chat.completions.create(**api_params) 
            
            # 提取响应内容
            assistant_content = response.choices[0].message.content
            #response.choices[0].message.content 就是AI的回答
            
            # 记录响应信息(调试模式)
            if self.debug_mode:
                usage = response.usage
                logger.debug(f"API响应成功:")
                logger.debug(f"  使用Token: {usage.prompt_tokens} (输入) + {usage.completion_tokens} (输出) = {usage.total_tokens} (总计)")
                logger.debug(f"  响应长度: {len(assistant_content)} 字符")
            
            return {"status": "success", "result": assistant_content}
            
        except Exception as e:
            error_msg = f"LLM API调用失败: {str(e)}"
            logger.error(f"Agent '{agent_config.name}' API调用失败: {error_msg}")
            
            # 记录详细的错误信息(调试模式)
            if self.debug_mode:
                import traceback
                logger.debug(f"详细错误信息:")
                logger.debug(traceback.format_exc())
            
            return {"status": "error", "error": error_msg}
 
#其他功能方法   
    def get_available_agents(self) -> List[Dict[str, Any]]:
        """获取所有可用的Agent列表"""
        return [
            {
                "name": agent_config.name,
                "base_name": agent_config.base_name,
                "description": agent_config.description,
                "model_id": agent_config.id,
                "temperature": agent_config.temperature,
                "max_output_tokens": agent_config.max_output_tokens
            }
            for agent_config in self.agents.values()
        ]
        '''
        返回格式:
        [
            {
                "name": "助手A",
                "base_name": "assistant_a",
                "description": "这是一个助手",
                "model_id": "gpt-4",
                "temperature": 0.7,
                "max_output_tokens": 40000
            }
        ]
        '''
    
    def get_agent_info(self, agent_name: str) -> Optional[Dict[str, Any]]:
        """获取指定Agent的详细信息"""
        if agent_name not in self.agents:
            return None
        
        agent_config = self.agents[agent_name]
        return {
            "name": agent_config.name,
            "base_name": agent_config.base_name,
            "description": agent_config.description,
            "model_id": agent_config.id,
            "temperature": agent_config.temperature,
            "max_output_tokens": agent_config.max_output_tokens,
            "system_prompt": agent_config.system_prompt,
            "model_provider": agent_config.model_provider
        }
    
    def reload_configs(self):
        """重新加载Agent配置,修改配置文件后,不用重启程序,调用这个方法就能重新加载。"""
        self.agents.clear()
        self._load_agent_configs()
        logger.info("Agent配置已重新加载")
    
    def _register_agent_from_manifest(self, agent_name: str, agent_config: Dict[str, Any]):
        """从manifest注册Agent
        
        Args:
            agent_name: Agent名称
            agent_config: Agent配置字典
        """
        try:
            # 验证配置
            if not self._validate_agent_config(agent_config):
                logger.warning(f"Agent配置验证失败: {agent_name}")
                return False
            
            # 创建AgentConfig对象
            agent_config_obj = AgentConfig(
                id=agent_config.get('model_id', ''),
                name=agent_config.get('name', agent_name),
                base_name=agent_config.get('base_name', agent_name),
                system_prompt=agent_config.get('system_prompt', f'You are a helpful AI assistant named {agent_config.get("name", agent_name)}.'),
                max_output_tokens=agent_config.get('max_output_tokens', 8192),
                temperature=agent_config.get('temperature', 0.7),
                description=agent_config.get('description', f'Assistant {agent_config.get("name", agent_name)}.'),
                model_provider=agent_config.get('model_provider', 'openai'),
                api_base_url=agent_config.get('api_base_url', ''),
                api_key=agent_config.get('api_key', '')
            )
            
            # 注册到agents字典
            self.agents[agent_name] = agent_config_obj
            logger.info(f"已从manifest注册Agent: {agent_name} ({agent_config_obj.name})")
            return True
            
        except Exception as e:
            logger.error(f"从manifest注册Agent失败 {agent_name}: {e}")
            return False
    
    async def call_agent_by_action(self, agent_name: str, action_args: Dict[str, Any]) -> str:
        """根据动作调用Agent,不直接给提示词,而是给"动作",比如:
        
        Args:
            agent_name: Agent名称
            action_args: 动作参数,包含action和具体参数
            
        Returns:
            str: Agent执行结果
        """
        try:
            # 检查Agent是否存在
            if agent_name not in self.agents:
                return f"Agent '{agent_name}' 未找到或未正确配置"
            
            agent_config = self.agents[agent_name]
            action = action_args.get('action', '')
            
            # 构建用户提示词
            user_prompt = self._build_action_prompt(action, action_args)
            
            # 调用Agent
            result = await self.call_agent(agent_name, user_prompt)
            
            if result.get("status") == "success":
                return result.get("result", "")
            else:
                return f"Agent调用失败: {result.get('error', '未知错误')}"
                
        except Exception as e:
            logger.error(f"Agent动作调用失败 {agent_name}: {e}")
            return f"Agent动作调用异常: {str(e)}"
    
    def _build_action_prompt(self, action: str, action_args: Dict[str, Any]) -> str:
        """构建动作提示词
        
        Args:
            action: 动作名称
            action_args: 动作参数
            
        Returns:
            str: 构建的提示词
        """
        # 移除不需要的参数
        clean_args = {k: v for k, v in action_args.items() 
                     if k not in ['service_name', 'action']}
        
        # 构建提示词
        if clean_args:
            args_str = ", ".join([f"{k}: {v}" for k, v in clean_args.items()])
            return f"请执行动作 '{action}',参数: {args_str}"
        else:
            return f"请执行动作 '{action}'"
    
    # 删除重复的任务规划器初始化方法
    # 现在通过agentserver的task_scheduler处理任务规划
    
    async def process_intelligent_task(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        智能任务处理 - 核心方法,通过agentserver的task_scheduler处理
        处理更复杂的任务,可以分解成多个步骤。
        Args:
            query: 用户查询
            context: 上下文信息
            
        Returns:
            Dict[str, Any]: 处理结果
        """
        try:
            # 通过agentserver的task_scheduler处理智能任务
            from agentserver.task_scheduler import get_task_scheduler
            task_scheduler = get_task_scheduler()
            
            # 创建任务并调度
            import uuid  #用来标识这个任务
            task_id = str(uuid.uuid4())
            
            # 注册任务到调度器
            task_scheduler.task_registry[task_id] = {
                "id": task_id,
                "type": "processor",
                "status": "queued",
                "params": {"query": query},
                "context": context
            }
            
            # 调度并行执行,把任务交给"任务调度器"去执行,可以并行处理多个任务。
            tasks = [{
                "type": "processor",
                "params": {"query": query},
                "session_id": context.get("session_id") if context else None
            }]
            
            results = await task_scheduler.schedule_parallel_execution(tasks)
            
            if results and len(results) > 0:
                result = results[0]
                return {
                    "status": "success" if result.get("success") else "error",
                    "result": result.get("result"),
                    "error": result.get("error"),
                    "task_id": task_id
                }
            else:
                return {
                    "status": "error",
                    "error": "任务调度失败"
                }
            
        except Exception as e:
            logger.error(f"智能任务处理失败: {e}")
            # 降级到传统Agent调用,如果复杂任务处理失败,就改用简单的调用方式
            return await self.call_agent("default", query)

# 全局Agent管理器实例
_AGENT_MANAGER = None
'''
这是什么?
创建一个全局变量_AGENT_MANAGER
初始值是None(空)
变量名前面的下划线_是约定,表示"这是内部变量"
'''

def get_agent_manager() -> AgentManager:
    """获取全局Agent管理器实例"""
    global _AGENT_MANAGER
    if _AGENT_MANAGER is None:
        _AGENT_MANAGER = AgentManager()
    return _AGENT_MANAGER
'''
这是一个"单例模式"的设计:
逻辑流程:
第一次调用 get_agent_manager():
    _AGENT_MANAGER 是 None → 创建新的 AgentManager → 返回它

第二次调用 get_agent_manager():
    _AGENT_MANAGER 已经有值了 → 直接返回这个值

为什么这样设计?
整个程序只需要一个AgentManager实例
避免了重复创建,节省内存
所有地方都使用同一个实例
'''

# 便捷函数
async def call_agent(agent_name: str, prompt: str, session_id: str = None) -> Dict[str, Any]:
    """便捷的Agent调用函数,这是一个"包装函数":
    它让调用AI助手变得非常简单:"""
    manager = get_agent_manager()
    return await manager.call_agent(agent_name, prompt, session_id)

def list_agents() -> List[Dict[str, Any]]:
    """便捷的Agent列表获取函数"""
    manager = get_agent_manager()  #返回所有可用助手的列表,比如:
    return manager.get_available_agents()
'''
agents = list_agents()
# agents 会是:
# [
#     {"name": "助手A", "description": "...", ...},
#     {"name": "助手B", "description": "...", ...}
# ]
'''

def get_agent_info(agent_name: str) -> Optional[Dict[str, Any]]:
    """便捷的Agent信息获取函数, 获取助手详细信息的函数"""
    manager = get_agent_manager()
    return manager.get_agent_info(agent_name)
'''
获取某个助手的详细信息,比如:
info = get_agent_info("助手A")
# info 会是:
# {
#     "name": "助手A",
#     "base_name": "assistant_a",
#     "description": "这是一个助手",
#     "system_prompt": "你是一个助手",
#     ...
# }
注意:如果助手不存在,返回None,Optional[Dict[str, Any]]表示:要么返回字典,要么返回None
'''

# 智能任务处理函数 - 通过agentserver处理
async def process_intelligent_task(query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """便捷的智能任务处理函数 - 通过agentserver的task_scheduler处理,让复杂的任务处理变得简单。"""
    manager = get_agent_manager()
    return await manager.process_intelligent_task(query, context)

# 任务管理函数 - 通过agentserver的task_scheduler处理
async def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
    """便捷的任务状态获取函数 - 通过agentserver处理"""
    try:
        from agentserver.task_scheduler import get_task_scheduler
        task_scheduler = get_task_scheduler()
        return await task_scheduler.get_task_status(task_id)
    except Exception as e:
        logger.error(f"获取任务状态失败: {e}")
        return None
'''
使用示例:
status = await get_task_status("123e4567-e89b-12d3-a456-426614174000")
# status 可能是:
# {
#     "id": "123e4567...",
#     "status": "running",  # 运行中
#     "progress": 50,       # 进度50%
#     "result": None
# }
'''

#这个函数忽略了status_filter参数,总是返回所有运行中的任务。
async def get_task_list(status_filter: Optional[str] = None) -> List[Dict[str, Any]]:
    """便捷的任务列表获取函数 - 通过agentserver处理"""
    try:
        from agentserver.task_scheduler import get_task_scheduler
        task_scheduler = get_task_scheduler()
        return await task_scheduler.get_running_tasks()
    except Exception as e:
        logger.error(f"获取任务列表失败: {e}")
        return []

async def get_execution_stats() -> Dict[str, Any]:
    """便捷的执行统计获取函数 - 通过agentserver处理"""
    try:
        from agentserver.task_scheduler import get_task_scheduler
        task_scheduler = get_task_scheduler()
        return {
            "total_tasks": len(task_scheduler.task_registry),
            "running_tasks": len([t for t in task_scheduler.task_registry.values() if t.get("status") == "running"]),
            "queued_tasks": len([t for t in task_scheduler.task_registry.values() if t.get("status") == "queued"])
        }
    except Exception as e:
        logger.error(f"获取执行统计失败: {e}")
        return {}
'''
这个函数返回:
{
    "total_tasks": 10,      // 总任务数
    "running_tasks": 3,     // 运行中的任务数
    "queued_tasks": 7       // 排队中的任务数
}

统计逻辑:
  total_tasks: 任务注册表中所有任务的数量
  running_tasks: 状态是"running"的任务数量
  queued_tasks: 状态是"queued"的任务数量
'''

async def cancel_task(task_id: str) -> bool:
    """便捷的任务取消函数 - 通过agentserver处理"""
    try:
        from agentserver.task_scheduler import get_task_scheduler
        task_scheduler = get_task_scheduler()
        if task_id in task_scheduler.task_registry:
            task_scheduler.task_registry[task_id]["status"] = "cancelled"
            return True
        return False
    except Exception as e:
        logger.error(f"取消任务失败: {e}")
        return False
'''
1. 检查任务是否存在
2. 如果存在 → 把状态改成"cancelled" → 返回True
3. 如果不存在 → 返回False'''


# ==========为什么没有主函数入口===========
#1. 这是模块(Module),不是脚本(Script)
#Python文件的两种类型:
'''
类型  | 用途             |  是否有if __name__ == "__main__" | 例子
模块  | 被其他文件导入使用  |  通常没有                        | math.py, random.py, agent_manager.py
脚本  | 直接运行执行任务   |  必须有                          | main.py, run_server.py
这个agent_manager.py是模块:它提供功能,但不直接运行,其他文件导入它,使用它的功能
'''
#AgentManager 模块完整层级结构
'''
agent_manager.py
├── 模块头部(文件配置)
│   ├── #!/usr/bin/env python3                     # 执行环境声明
│   ├── # -*- coding: utf-8 -*-                    # 编码声明
│   └── """Agent管理器 - 独立的Agent注册和调用系统"""   # 模块文档
│
├── 导入模块
│   ├── 系统模块
│   │   ├── os                                     # 操作系统交互
│   │   ├── json                                   # JSON数据处理
│   │   ├── asyncio                                # 异步编程支持
│   │   ├── logging                                # 日志记录
│   │   └── time                                   # 时间处理
│   │
│   ├── 路径处理
│   │   └── from pathlib import Path               # 现代化路径操作
│   │
│   ├── 类型提示
│   │   └── from typing import Dict, Any, Optional, List  # 类型注解
│   │
│   ├── 数据类
│   │   └── from dataclasses import dataclass, field     # 简化类定义
│   │
│   ├── 日期时间
│   │   └── from datetime import datetime, timedelta     # 时间日期操作
│   │
│   └── 正则表达式
│       └── import re                               # 字符串模式匹配
│
├── 日志配置
│   ├── logging.basicConfig(level=logging.INFO)     # 基础日志配置
│   ├── logger = logging.getLogger("AgentManager")  # 获取日志记录器
│   └── 屏蔽HTTP库DEBUG日志(3行)                   # 减少日志噪音
│
├── 数据类定义(Data Classes)
│   ├── AgentConfig(Agent配置模板)
│   │   ├── id: str                                 # 模型ID(必需)
│   │   ├── name: str                               # Agent名称-中文(必需)
│   │   ├── base_name: str                          # 基础名称-英文(必需)
│   │   ├── system_prompt: str                      # 系统提示词(必需)
│   │   ├── max_output_tokens: int = 40000          # 最大输出token数
│   │   ├── temperature: float = 0.7                # 温度参数
│   │   ├── description: str = ""                   # 描述信息
│   │   ├── model_provider: str = "openai"          # 模型提供商
│   │   ├── api_base_url: str = ""                  # API基础URL
│   │   └── api_key: str = ""                       # API密钥
│   │
│   └── AgentSession(Agent会话记录)
│       ├── timestamp: float = field(default_factory=time.time)    # 时间戳
│       ├── history: List[Dict[str, str]] = field(default_factory=list)  # 历史记录
│       └── session_id: str = "default_user_session"              # 会话ID
│
├── 核心管理器类(AgentManager)
│   ├── 初始化方法 __init__
│   │   ├── 参数: config_dir: str = None            # 配置文件目录(可选)
│   │   ├── 属性初始化
│   │   │   ├── self.config_dir                     # 配置目录路径
│   │   │   ├── self.agents                         # Agent配置字典
│   │   │   └── self.agent_sessions                 # Agent会话字典(三层嵌套)
│   │   │
│   │   ├── 配置读取
│   │   │   ├── self.max_history_rounds             # 最大历史轮数
│   │   │   ├── self.context_ttl_hours = 24         # 上下文有效期(小时)
│   │   │   └── self.debug_mode = True              # 调试模式开关
│   │   │
│   │   ├── 条件加载配置
│   │   │   ├── if self.config_dir                  # 有配置目录时
│   │   │   │   ├── self.config_dir.mkdir()         # 创建目录
│   │   │   │   └── self._load_agent_configs()      # 加载配置
│   │   │   └── else                                # 无配置目录时
│   │   │       └── logger.info()                   # 记录MCP架构提示
│   │   │
│   │   ├── 定期清理任务
│   │   │   ├── try: asyncio.get_running_loop()     # 获取事件循环
│   │   │   ├── asyncio.create_task()               # 创建清理任务
│   │   │   └── except RuntimeError: pass           # 无事件循环时跳过
│   │   │
│   │   └── 初始化完成日志
│   │
│   ├── 配置管理方法
│   │   ├── _load_agent_configs()                   # 加载Agent配置
│   │   │   ├── 检查目录存在性
│   │   │   ├── 扫描JSON配置文件
│   │   │   ├── 读取并解析JSON
│   │   │   ├── 验证配置有效性
│   │   │   ├── 创建AgentConfig对象
│   │   │   └── 注册到agents字典
│   │   │
│   │   ├── _validate_agent_config()                # 验证Agent配置
│   │   │   ├── 检查必需字段: ['model_id', 'name']
│   │   │   └── 返回验证结果: bool
│   │   │
│   │   ├── _register_agent_from_manifest()         # 从manifest注册Agent
│   │   │   ├── 验证配置
│   │   │   ├── 创建AgentConfig对象
│   │   │   ├── 注册到agents字典
│   │   │   └── 返回注册结果: bool
│   │   │
│   │   └── reload_configs()                        # 重新加载配置
│   │       ├── 清空现有agents
│   │       └── 重新加载配置
│   │
│   ├── 会话管理方法
│   │   ├── get_agent_session_history()             # 获取Agent会话历史
│   │   │   ├── 检查Agent是否存在
│   │   │   ├── 检查会话是否存在
│   │   │   ├── 检查会话是否过期
│   │   │   └── 返回历史记录列表
│   │   │
│   │   ├── update_agent_session_history()          # 更新Agent会话历史
│   │   │   ├── 添加新对话记录
│   │   │   ├── 更新时间戳
│   │   │   └── 限制历史消息数量
│   │   │
│   │   ├── _is_context_expired()                   # 检查上下文是否过期
│   │   │   └── 判断时间差是否超过TTL
│   │   │
│   │   └── _periodic_cleanup()                     # 定期清理过期的会话
│   │       ├── 每小时执行一次
│   │       ├── 遍历所有Agent会话
│   │       ├── 删除过期会话
│   │       └── 清理空会话字典
│   │
│   ├── 消息构建方法
│   │   ├── _replace_placeholders()                 # 替换提示词占位符
│   │   │   ├── Agent配置替换(9种占位符)
│   │   │   ├── 环境变量替换(正则匹配)
│   │   │   └── 时间相关替换(3种格式)
│   │   │
│   │   ├── _build_system_message()                 # 构建系统消息
│   │   │   ├── 处理系统提示词占位符
│   │   │   └── 返回{"role": "system", "content": ...}
│   │   │
│   │   ├── _build_user_message()                   # 构建用户消息
│   │   │   ├── 处理用户提示词占位符
│   │   │   └── 返回{"role": "user", "content": ...}
│   │   │
│   │   ├── _build_assistant_message()              # 构建助手消息
│   │   │   └── 返回{"role": "assistant", "content": ...}
│   │   │
│   │   └── _validate_messages()                    # 验证消息序列
│   │       ├── 检查消息非空
│   │       ├── 检查消息格式正确性
│   │       ├── 检查消息角色合法性
│   │       └── 检查系统消息在首位
│   │
│   ├── Agent调用方法
│   │   ├── call_agent()                           # 调用指定Agent
│   │   │   ├── 检查Agent是否存在
│   │   │   ├── 生成会话ID(如果未提供)
│   │   │   ├── 构建完整消息序列(系统+历史+当前)
│   │   │   ├── 验证消息序列
│   │   │   ├── 记录调试信息(如果debug模式)
│   │   │   ├── 调用LLM API
│   │   │   ├── 更新会话历史(如果成功)
│   │   │   └── 返回调用结果
│   │   │
│   │   ├── _call_llm_api()                        # 调用LLM API
│   │   │   ├── 导入AsyncOpenAI
│   │   │   ├── 记录调试信息
│   │   │   ├── 验证必需参数(model_id, api_key)
│   │   │   ├── 创建API客户端
│   │   │   ├── 准备API参数
│   │   │   ├── 调用chat.completions.create()
│   │   │   ├── 提取响应内容
│   │   │   └── 返回成功结果或错误
│   │   │
│   │   ├── call_agent_by_action()                 # 根据动作调用Agent
│   │   │   ├── 检查Agent是否存在
│   │   │   ├── 提取动作参数
│   │   │   ├── 构建动作提示词
│   │   │   ├── 调用Agent
│   │   │   └── 返回执行结果
│   │   │
│   │   └── _build_action_prompt()                 # 构建动作提示词
│   │       ├── 清理不需要的参数
│   │       ├── 格式化参数字符串
│   │       └── 返回格式化提示词
│   │
│   ├── 信息查询方法
│   │   ├── get_available_agents()                 # 获取所有可用Agent列表
│   │   │   └── 返回包含基本信息的字典列表
│   │   │
│   │   └── get_agent_info()                       # 获取指定Agent详细信息
│   │       ├── 检查Agent是否存在
│   │       └── 返回包含完整信息的字典
│   │
│   └── 智能任务处理方法
│       └── process_intelligent_task()             # 智能任务处理
│           ├── 通过agentserver.task_scheduler处理
│           ├── 创建唯一任务ID(UUID)
│           ├── 注册任务到调度器
│           ├── 调度并行执行
│           ├── 处理执行结果
│           └── 降级处理(失败时调用默认Agent)
│
├── 全局实例管理
│   ├── 全局变量: _AGENT_MANAGER = None            # 单例实例
│   │
│   └── 获取实例函数: get_agent_manager()
│       ├── 声明使用全局变量
│       ├── 检查实例是否存在
│       ├── 不存在时创建新实例
│       └── 返回实例
│
└── 便捷函数(Convenience Functions)
    ├── 调用相关
    │   ├── call_agent()                           # 便捷调用函数
    │   ├── list_agents()                          # 便捷列表获取函数
    │   ├── get_agent_info()                       # 便捷信息获取函数
    │   └── process_intelligent_task()             # 便捷智能任务处理函数
    │
    └── 任务管理相关(通过agentserver.task_scheduler)
        ├── get_task_status()                      # 获取任务状态
        ├── get_task_list()                        # 获取任务列表
        ├── get_execution_stats()                  # 获取执行统计
        └── cancel_task()                          # 取消任务
'''
#模块使用流程总结
'''
使用流程:
1. 导入模块函数
   → from agent_manager import call_agent, list_agents
   
2. 获取Agent列表
   → agents = list_agents()
   
3. 选择并调用Agent
   → result = await call_agent("助手名称", "用户消息", "session_id")
   
4. 处理结果
   → if result["status"] == "success": 使用 result["result"]
   → else: 处理错误 result["error"]'''

task_scheduler.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""通用任务调度器 - 融合智能记忆管理的任务调度系统"""

import asyncio  # 异步支持 #
import uuid  # 任务ID #
import json  # JSON处理 #
import logging  # 日志 #
import time  # 时间处理 #
from typing import Any, Dict, List, Optional, Union  # 类型标注 #
from dataclasses import dataclass, field  # 数据类 #
from datetime import datetime, timedelta  # 时间处理 #

# 配置日志
logger = logging.getLogger(__name__)
#创建了一个“日志记录器”,名字是当前模块的名字,想象成一个专门的记事本,用来记录程序运行情况

@dataclass
class TaskStep:
    """任务步骤数据类"""
    step_id: str  #step_id - 步骤ID(唯一编号)
    task_id: str  #task_id - 属于哪个任务
    purpose: str  # 步骤目的,这步要做什么
    content: str  # 步骤内容
    output: str = ""  # 步骤输出,输出结果(默认是空字符串)
    analysis: Optional[Dict[str, Any]] = None  # 分析结果
    timestamp: float = field(default_factory=time.time) #时间戳(自动记录当前时间)
    success: bool = True  # 是否成功
    error: Optional[str] = None  # 错误信息

@dataclass  #这个类用来存储压缩后的记忆,就像把很多经验总结成几条要点。
class CompressedMemory:
    """压缩记忆数据类"""
    memory_id: str  #记忆ID
    key_findings: List[str]  # 关键发现(列表)
    failed_attempts: List[str]  # 失败尝试(列表)
    current_status: str  # 当前状态
    next_steps: List[str]  # 建议步骤
    source_steps: int  # 来源步骤数
    timestamp: float = field(default_factory=time.time)

#注意:类名以 _ 开头,这是一个约定,表示"内部类"或"私有类"
class _TaskScheduler: #负责管理和协调所有任务
    """通用任务调度器 - 融合智能记忆管理"""

    def __init__(self, config: Optional[Any] = None) -> None:  #这是类的"构造函数",创建对象时会自动调用:
        # 导入配置
        if config is None:  #如果没传入配置,就从 agentserver.config 获取,像设定游戏的初始参数一样
            from agentserver.config import get_task_scheduler_config
            config = get_task_scheduler_config()
        
        self.config = config
        
        # 基础任务管理
        #task_registry:一个字典,记录所有任务(像花名册)
        self.task_registry: Dict[str, Dict[str, Any]] = {}  # 任务注册表
        #_lock:一把"锁",防止多个任务同时修改数据(像厕所的门锁)
        self._lock = asyncio.Lock()  # 并发锁
        
        # 智能记忆管理
        self.max_steps = config.max_steps  # max_steps:最多保存多少步骤
        self.compression_threshold = config.compression_threshold  # 压缩阈值,compression_threshold:达到多少步骤时压缩
        self.keep_last_steps = config.keep_last_steps  # 压缩后保留步骤数
        #数据存储容器
        self.task_steps: Dict[str, List[TaskStep]] = {}  # 任务步骤历史,task_steps:每个任务的所有步骤
        self.compressed_memories: List[CompressedMemory] = []  # 压缩记忆,compressed_memories:压缩后的记忆
        self.key_facts: Dict[str, str] = {}  # 关键事实存储,key_facts:重要的发现(像书签)
        self.failed_attempts: Dict[str, int] = {}  # 失败尝试计数,failed_attempts:失败的次数统计
        self.llm_config: Optional[Dict[str, Any]] = None  # LLM配置,llm_config:AI模型配置(还没设置)
        
        # 会话级别的记忆管理 - 新增功能,这是扩展功能,支持"会话"概念:
        self.session_memories: Dict[str, Dict[str, Any]] = {}  # 会话记忆:session_id -> {tasks, compressed_memories, key_facts}
        #session_memories:按会话组织记忆(像文件夹)
        self.session_task_mapping: Dict[str, str] = {}  # 任务ID到会话ID的映射
        #session_task_mapping:任务属于哪个会话
        self.analysis_session_mapping: Dict[str, str] = {}  # 分析会话ID到原始会话ID的映射
        #analysis_session_mapping:分析会话和原会话的关系

    #设置LLM配置的方法,很简单,就是设置AI模型配置
    def set_llm_config(self, config: Dict[str, Any]) -> None:
        """设置LLM配置用于智能压缩"""
        self.llm_config = config
        
   #创建任务的方法 create_task,这是非常重要的方法,创建一个新任务:
    async def create_task(self, task_id: str, purpose: str, session_id: Optional[str] = None, 
                         analysis_session_id: Optional[str] = None) -> str:
        """创建新任务 - 应用与MCP服务器相同的会话管理逻辑,并关联会话记忆"""
        #上锁和注册
        async with self._lock:   #async with self._lock::先锁上,保证安全
            self.task_registry[task_id] = {    #在 task_registry 中注册新任务,包含基本信息
                "id": task_id,
                "purpose": purpose,
                "session_id": session_id,
                "analysis_session_id": analysis_session_id,
                "status": "created",
                "created_at": time.time(),
                "steps_count": 0
            }
            
            # 初始化任务步骤列表
            if task_id not in self.task_steps:
                self.task_steps[task_id] = []   #为这个任务创建一个空步骤列表
            
            # 会话级别的记忆管理,如果有会话ID,建立各种映射关系
            if session_id: 
                # 建立任务到会话的映射
                self.session_task_mapping[task_id] = session_id
                
                # 建立分析会话到原始会话的映射
                if analysis_session_id:
                    self.analysis_session_mapping[analysis_session_id] = session_id
            
                
                # 初始化会话记忆(如果不存在),如果这个会话是新的,就初始化它的记忆仓库
                if session_id not in self.session_memories:
                    self.session_memories[session_id] = {
                        "tasks": [],
                        "compressed_memories": [],
                        "key_facts": {},
                        "failed_attempts": {},
                        "created_at": time.time(),
                        "last_activity": time.time()
                    }
                
                # 将会话记忆与任务关联
                self.session_memories[session_id]["tasks"].append(task_id)  #把任务ID加入会话的任务列表
                self.session_memories[session_id]["last_activity"] = time.time()   #更新会话最后活动时间
            
            logger.info(f"[任务创建] 创建任务: {task_id}, 目的: {purpose}, 会话: {session_id}, 分析会话: {analysis_session_id}")
            return task_id

    async def add_task_step(self, task_id: str, step: TaskStep) -> None:
        """添加任务步骤到历史记录,并更新会话级别的记忆管理"""
        async with self._lock:
            if task_id not in self.task_steps: #安全检查
                self.task_steps[task_id] = []   #添加步骤到对应任务的步骤列表
            
            self.task_steps[task_id].append(step)
            
            # 提取关键事实,调用下面的 _extract_key_facts 方法
            self._extract_key_facts(step)
            
            # 记录失败尝试
            if not step.success:  #如果步骤失败,记录这个失败内容
                self.failed_attempts[step.content] = self.failed_attempts.get(step.content, 0) + 1  #使用 .get() 避免键不存在
            
            # 会话级别的记忆管理
            session_id = self.session_task_mapping.get(task_id)
            if session_id and session_id in self.session_memories: #获取任务所属会话,如果有会话,更新该会话的记忆
                # 更新会话记忆中的关键事实
                fact_key = f"task:{task_id}:step:{step.step_id}"
                if fact_key in self.key_facts:
                    self.session_memories[session_id]["key_facts"][fact_key] = self.key_facts[fact_key]
                
                # 更新会话记忆中的失败尝试
                if not step.success:
                    session_failed = self.session_memories[session_id]["failed_attempts"]
                    session_failed[step.content] = session_failed.get(step.content, 0) + 1
                
                # 更新会话活动时间
                self.session_memories[session_id]["last_activity"] = time.time()
            
            # 检查是否需要压缩记忆
            if len(self.task_steps[task_id]) >= self.compression_threshold: #如果步骤数达到阈值,就压缩记忆
                await self._compress_memory(task_id)

    def _extract_key_facts(self, step: TaskStep) -> None:
        """从步骤中提取关键事实"""
        # 提取关键命令和结果
        if step.content and step.output:
            #如果有内容和输出,就提取,output_summary:截断输出,只保留一部分,生成格式化的关键事实
            output_summary = step.output[:self.config.output_summary_length] + ("..." if len(step.output) > self.config.output_summary_length else "")
            fact_key = f"task:{step.task_id}:step:{step.step_id}"
            self.key_facts[fact_key] = f"命令:{step.content}, 结果: {output_summary}"
        
        # 提取分析结论,如果有分析结果且包含重要词,就保存
        if step.analysis and "analysis" in step.analysis:
            analysis = step.analysis["analysis"]
            if "关键发现" in analysis or "重要" in analysis:
                fact_key = f"analysis:{hash(analysis)}"
                self.key_facts[fact_key] = analysis

    async def _compress_memory(self, task_id: str) -> None:
        """压缩任务记忆,这是最复杂的方法,使用AI压缩记忆"""
        if not self.llm_config or task_id not in self.task_steps: #必须有LLM配置和任务步骤
            return
        
        logger.info(f"开始压缩任务 {task_id} 的记忆...")
        
        # 构建压缩提示,调用另一个方法构建提示
        prompt = self._build_compression_prompt(task_id)
        
        try:
            # 调用LLM进行压缩,调用AI模型压缩记忆
            compressed_data = await self._call_llm_compression(prompt)
            
            # 创建压缩记忆对象,用AI返回的数据创建 CompressedMemory 对象,uuid.uuid4() 生成唯一ID
            memory = CompressedMemory(
                memory_id=str(uuid.uuid4()),
                key_findings=compressed_data.get("key_findings", []),
                failed_attempts=compressed_data.get("failed_attempts", []),
                current_status=compressed_data.get("current_status", "未知状态"),
                next_steps=compressed_data.get("next_steps", []),
                source_steps=len(self.task_steps[task_id])
            )
            
            # 更新失败尝试记录,把压缩后的失败记录加入全局失败记录
            for attempt in memory.failed_attempts:
                self.failed_attempts[attempt] = self.failed_attempts.get(attempt, 0) + 1
            
            #加入压缩记忆列表
            self.compressed_memories.append(memory)
            
            # 会话级别的压缩记忆管理,如果有会话,也加入会话的记忆
            session_id = self.session_task_mapping.get(task_id)
            if session_id and session_id in self.session_memories:
                self.session_memories[session_id]["compressed_memories"].append(memory)
                logger.info(f"[会话记忆] 会话 {session_id} 添加压缩记忆: {len(memory.key_findings)}个关键发现")
            
            logger.info(f"记忆压缩成功: 添加了{len(memory.key_findings)}个关键发现")
            
        except Exception as e:
            logger.error(f"记忆压缩失败: {e}")
            # 创建错误记忆
            error_memory = CompressedMemory(
                memory_id=str(uuid.uuid4()),
                key_findings=[f"压缩失败: {str(e)}"],
                failed_attempts=[],
                current_status="压缩失败",
                next_steps=["重新尝试压缩"],
                source_steps=len(self.task_steps[task_id])
            )
            self.compressed_memories.append(error_memory)
        
        # 清空历史记录,保留最后几步
        keep_last = min(self.keep_last_steps, len(self.task_steps[task_id]))
        self.task_steps[task_id] = self.task_steps[task_id][-keep_last:]

    def _build_compression_prompt(self, task_id: str) -> str:
        """构建压缩提示"""
        prompt = """
你是一个专业的任务执行助手,需要压缩任务执行历史记录。请执行以下任务:
1. 识别并提取关键的技术细节和发现
2. 标记已尝试但失败的解决方案
3. 总结当前任务状态和下一步建议
4. 以JSON格式返回以下结构的数据:
{
  "key_findings": ["发现1", "发现2"],
  "failed_attempts": ["命令1", "命令2"],
  "current_status": "当前状态描述",
  "next_steps": ["建议1", "建议2"]
}

任务ID: {task_id}
历史记录:
""".format(task_id=task_id)
        
        # 添加关键事实
        prompt += "关键事实摘要:\n"
        #list(self.key_facts.items()):把字典转换成(键,值)的列表
        #[-self.config.key_facts_compression_limit:]:取最近N个(配置决定)
        #循环添加每个关键事实到提示中
        recent_facts = list(self.key_facts.items())[-self.config.key_facts_compression_limit:]  # 最近关键事实
        for _, value in recent_facts:
            prompt += f"- {value}\n"
        
        # 添加历史步骤
        steps = self.task_steps[task_id][-self.compression_threshold:]
        for i, step in enumerate(steps): #获取最近N个步骤(压缩阈值决定数量),enumerate(steps):同时获取索引和步骤对象,添加步骤的基本信息
            prompt += f"\n步骤 {i+1}:\n"
            prompt += f"- 目的: {step.purpose}\n"
            prompt += f"- 命令: {step.content}\n"
            if step.output:  #处理输出和结果,如果有输出,显示部分输出(截断到配置的长度),三元表达式:如果输出太长就加...,否则不加
                prompt += f"- 输出: {step.output[:self.config.step_output_display_length]}{'...' if len(step.output) > self.config.step_output_display_length else ''}\n"
            if step.analysis: #处理分析和失败,.get("analysis", "无分析"):安全获取,如果不存在返回"无分析",如果步骤失败,显示错误信息
                analysis = step.analysis.get("analysis", "无分析")
                prompt += f"- 分析: {analysis}\n"
            if not step.success:
                prompt += f"- 状态: 失败 - {step.error}\n"
        
        return prompt

    #调用LLM进行压缩的方法,这个方法实际调用AI模型来压缩记忆:
    async def _call_llm_compression(self, prompt: str) -> Dict[str, Any]:
        """调用LLM进行记忆压缩"""
        try:  #导入litellm库
            import litellm    #litellm:一个统一的AI模型调用库
            litellm.enable_json_schema_validation = True   #enable_json_schema_validation:启用JSON格式验证
            
            response = litellm.completion(   #使用 litellm.completion 调用AI,参数包括:模型名、API密钥、API地址、消息、最大token数
                model=self.llm_config["model"],
                api_key=self.llm_config["api_key"],
                api_base=self.llm_config["api_base"],
                messages=[{"role": "user", "content": prompt}],
                max_tokens=1024,
            )
            
            #response.choices[0].message.content:获取AI回复内容
            json_str = response.choices[0].message.content.strip()  #.strip():去掉首尾空格
            return json.loads(json_str)   #json.loads():把JSON字符串转换成Python字典
            
        except Exception as e:   #错误处理
            logger.error(f"LLM压缩调用失败: {e}")
            return {    #如果出错,返回一个默认的错误结构,保证方法总能返回有效的字典
                "key_findings": [f"压缩失败: {str(e)}"],
                "failed_attempts": [],
                "current_status": "压缩失败",
                "next_steps": ["检查LLM配置"]
            }
    
    #并行执行调度方法 
    async def schedule_parallel_execution(self, tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """并行调度执行给定任务列表,返回结果列表"""
        if not tasks:    #空检查
            return []

        # 定义内部任务运行函数
        async def _run_task(task: Dict[str, Any]) -> Dict[str, Any]:    #定义异步函数来处理单个任务
            task_id = task.get("id") or str(uuid.uuid4())               #如果没有提供任务ID,就生成一个
            async with self._lock:                            # 注册任务
                self.task_registry[task_id] = {
                    "id": task_id,
                    "type": task.get("type") or "processor",
                    "status": "running",
                    "params": task.get("params") or {},
                    "context": task.get("context"),
                    "created_at": time.time()
                }

            try:
                # 创建任务步骤记录
                step = TaskStep(
                    step_id=str(uuid.uuid4()),
                    task_id=task_id,
                    purpose=task.get("purpose", "执行任务"),
                    content=str(task.get("params", {})),
                    success=True
                )
                
                # 占位执行:此处可接入真实执行器
                await asyncio.sleep(0) #await asyncio.sleep(0):让出控制权,不实际等待,这是一个占位,实际应该调用真正的任务执行器
                result = {
                    "success": True,
                    "result": None,
                    "task_type": self.task_registry[task_id]["type"],
                }
                
                step.output = str(result)
                # 记录步骤和结果
                await self.add_task_step(task_id, step) 
                
                return result
            #错误处理
            except Exception as e:
                # 记录失败的步骤
                step = TaskStep(
                    step_id=str(uuid.uuid4()),
                    task_id=task_id,
                    purpose=task.get("purpose", "执行任务"),
                    content=str(task.get("params", {})),
                    success=False,
                    error=str(e)
                )
                await self.add_task_step(task_id, step)
                
                return {"success": False, "error": str(e)}
            # 最终状态更新
            finally:   #finally:无论成功失败都会执行,更新任务状态为完成
                async with self._lock:
                    entry = self.task_registry.get(task_id)
                    if entry is not None:
                        entry["status"] = "completed"
                        entry["completed_at"] = time.time()
        
        #并行执行所有任务,coros:协程对象列表(每个任务一个)
        coros = [_run_task(t) for t in tasks]
        #asyncio.gather:同时运行所有协程
        return await asyncio.gather(*coros, return_exceptions=False) #return_exceptions=False:如果有异常就抛出,不返回异常

    async def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
        """查询指定任务状态"""
        async with self._lock:
            return self.task_registry.get(task_id)
    
    # 获取任务状态的方法
    async def get_running_tasks(self) -> List[Dict[str, Any]]:  #简单查询任务注册表,使用 .get() 方法安全获取
        """获取运行中任务列表"""
        async with self._lock:   #列表推导式:筛选出状态为"running"的任务,.values():获取所有任务字典
            return [t for t in self.task_registry.values() if t.get("status") == "running"]

    #获取任务记忆摘要的方法
    #这个方法生成任务的详细报告:
    async def get_task_memory_summary(self, task_id: str, include_key_facts: bool = True) -> str:
        """获取任务记忆摘要"""
        async with self._lock:  
            summary = ""
            
            # 1. 关键事实摘要
            if include_key_facts and self.key_facts:
                summary += "关键事实:\n"
                task_facts = [v for k, v in self.key_facts.items() if f"task:{task_id}" in k]
                for fact in task_facts[-self.config.key_facts_summary_limit:]:  # 显示最近关键事实
                    summary += f"- {fact}\n"
                summary += "\n"
                #筛选出这个任务的关键事实
                #if f"task:{task_id}" in k:检查键是否包含任务ID
            
            # 2. 压缩记忆摘要
            if self.compressed_memories:
                summary += "压缩记忆块:\n"
                for i, mem in enumerate(self.compressed_memories[-self.config.compressed_memory_summary_limit:]):  # 显示最近压缩块
                    summary += f"记忆块 #{len(self.compressed_memories)-i}:\n"   #len(self.compressed_memories)-i:计算块编号(从后往前)
                    summary += f"- 状态: {mem.current_status}\n"
                    #显示关键发现
                    summary += f"- 关键发现: {', '.join(mem.key_findings[:self.config.key_findings_display_limit])}"  #', '.join():用逗号和空格连接列表元素
                    if len(mem.key_findings) > self.config.key_findings_display_limit:  # 如果发现太多,显示"等X项"
                        summary += f" 等{len(mem.key_findings)}项"
                    summary += "\n"
                    
                    if mem.failed_attempts:
                        summary += f"- 失败尝试: {', '.join(mem.failed_attempts[:self.config.failed_attempts_display_limit])}" 
                        if len(mem.failed_attempts) > self.config.failed_attempts_display_limit:
                            summary += f" 等{len(mem.failed_attempts)}项"
                        summary += "\n"
                    
                    if mem.next_steps:
                        summary += f"- 建议步骤: {mem.next_steps[0]}\n"
                    
                    summary += f"- 来源: 基于{mem.source_steps}个历史步骤\n\n"
            
            # 3. 最近详细步骤
            if task_id in self.task_steps and self.task_steps[task_id]:  #检查任务是否有步骤,逆序显示步骤(最新的在最上面)
                summary += "最近详细步骤:\n"
                for i, step in enumerate(self.task_steps[task_id]):
                    step_num = len(self.task_steps[task_id]) - i
                    summary += f"步骤 {step_num}:\n"
                    summary += f"- 目的: {step.purpose}\n"
                    summary += f"- 命令: {step.content}\n"
                    
                    if step.output:  
                        summary += f"- 输出: {step.output[:self.config.step_output_display_length]}{'...' if len(step.output) > self.config.step_output_display_length else ''}\n"
                    
                    if step.analysis:
                        analysis = step.analysis.get("analysis", "无分析")
                        summary += f"- 分析: {analysis}\n"
                    
                    if not step.success:
                        summary += f"- 状态: 失败 - {step.error}\n"
                    
                    # 显示失败次数
                    if step.content in self.failed_attempts:   #检查这个命令是否失败过,显示总共失败了多少次
                        summary += f"- 历史失败次数: {self.failed_attempts[step.content]}\n"
                    
                    summary += "\n"
            
            return summary if summary else "无历史记录"

    async def get_global_memory_summary(self) -> str:
        """获取全局记忆摘要"""
        async with self._lock:
            summary = f"全局任务记忆摘要\n"
            summary += f"=" * 50 + "\n"
            
            # 任务统计
            total_tasks = len(self.task_registry)
            running_tasks = len([t for t in self.task_registry.values() if t.get("status") == "running"])
            completed_tasks = len([t for t in self.task_registry.values() if t.get("status") == "completed"])
            
            summary += f"任务统计:\n"
            summary += f"- 总任务数: {total_tasks}\n"
            summary += f"- 运行中: {running_tasks}\n"
            summary += f"- 已完成: {completed_tasks}\n\n"
            
            # 记忆统计
            total_steps = sum(len(steps) for steps in self.task_steps.values())  #sum():计算所有任务的步骤总数
            total_compressed = len(self.compressed_memories)
            total_facts = len(self.key_facts)
            total_failures = len(self.failed_attempts)
            
            summary += f"记忆统计:\n"
            summary += f"- 总步骤数: {total_steps}\n"
            summary += f"- 压缩记忆块: {total_compressed}\n"
            summary += f"- 关键事实: {total_facts}\n"
            summary += f"- 失败尝试: {total_failures}\n\n"
            
            # 最近活动
            if self.compressed_memories:
                summary += "最近压缩记忆:\n"
                for mem in self.compressed_memories[-self.config.compressed_memory_global_limit:]:  # 最近压缩记忆
                    summary += f"- {mem.current_status} (基于{mem.source_steps}步骤)\n"
                    if mem.key_findings:
                        summary += f"  关键发现: {mem.key_findings[0]}\n"
                summary += "\n"
            
            return summary

    async def get_failed_attempts_summary(self) -> Dict[str, int]:   #返回失败尝试的副本(字典)
        """获取失败尝试摘要"""
        async with self._lock:
            return self.failed_attempts.copy()   #.copy():创建副本,避免外部修改原始数据
            #返回格式:{"命令1": 失败次数, "命令2": 失败次数}

    async def clear_task_memory(self, task_id: str) -> bool:
        """清除指定任务的记忆"""
        async with self._lock:
            if task_id in self.task_steps:  #检查任务是否存在
                del self.task_steps[task_id]   #del self.task_steps[task_id]:删除该任务的步骤列表
                logger.info(f"已清除任务 {task_id} 的记忆")
                return True
            return False   #返回 True 表示成功,False 表示任务不存在
           #只清除步骤,不清除其他记忆

    async def clear_all_memory(self) -> None:
        """清除所有记忆"""
        async with self._lock:
            self.task_steps.clear()  #.clear():清空整个字典/列表,清除所有7个数据存储容器
            self.compressed_memories.clear()
            self.key_facts.clear()
            self.failed_attempts.clear()
            # 清除会话级别的记忆
            self.session_memories.clear()
            self.session_task_mapping.clear()
            self.analysis_session_mapping.clear()
            logger.info("已清除所有记忆")
            #相当于重置整个系统

    # ============ 会话级别的记忆管理方法 ============
    
    async def get_session_memory_summary(self, session_id: str) -> Dict[str, Any]:
        """获取会话记忆摘要"""
        async with self._lock:   #检查会话是否存在
            if session_id not in self.session_memories:
                return {"error": f"会话 {session_id} 不存在"}
            
            session_memory = self.session_memories[session_id]  #获取会话记忆
            
            # 构建会话摘要
            summary = {    #包含会话的基本信息和统计
                "session_id": session_id,
                "created_at": session_memory["created_at"],
                "last_activity": session_memory["last_activity"],
                "tasks_count": len(session_memory["tasks"]),
                "compressed_memories_count": len(session_memory["compressed_memories"]),
                "key_facts_count": len(session_memory["key_facts"]),
                "failed_attempts_count": len(session_memory["failed_attempts"]),
                "tasks": session_memory["tasks"],
                "recent_key_facts": list(session_memory["key_facts"].values())[-5:],  # list(...)[-5:]:取最后5个值,最近5个关键事实
                "recent_compressed_memories": [   # 最近压缩记忆
                    {
                        "memory_id": mem.memory_id,
                        "key_findings": mem.key_findings[:3],  # 前3个关键发现,列表推导式生成最近3个压缩记忆的简化信息
                        "current_status": mem.current_status,
                        "source_steps": mem.source_steps
                    }
                    for mem in session_memory["compressed_memories"][-3:]  # 最近3个压缩记忆
                ],
                "failed_attempts": dict(list(session_memory["failed_attempts"].items())[-5:])  # dict(list(...)[-5:]):取最后5个键值对转回字典,最近5个失败尝试
            }
            
            return summary
    
    async def get_session_compressed_memories(self, session_id: str) -> List[Dict[str, Any]]:
        """获取会话的压缩记忆"""
        async with self._lock:  #安全检查
            if session_id not in self.session_memories:
                return []
            
            session_memory = self.session_memories[session_id]  #返回所有压缩记忆的详细信息
            return [ 
                {
                    "memory_id": mem.memory_id,
                    "key_findings": mem.key_findings,
                    "failed_attempts": mem.failed_attempts,
                    "current_status": mem.current_status,
                    "next_steps": mem.next_steps,
                    "source_steps": mem.source_steps
                }
                for mem in session_memory["compressed_memories"]
            ]
        #返回完整的压缩记忆信息
        #注意:这里没有限制数量,返回所有
    
    async def get_session_key_facts(self, session_id: str) -> Dict[str, str]:
        """获取会话的关键事实"""
        async with self._lock:
            if session_id not in self.session_memories:
                return {}
            
            return self.session_memories[session_id]["key_facts"].copy()\
        #简单返回会话关键事实的副本,格式:{"事实键": "事实内容"}
    
    async def get_session_failed_attempts(self, session_id: str) -> Dict[str, int]:
        """获取会话的失败尝试"""
        async with self._lock:
            if session_id not in self.session_memories:
                return {}
            
            return self.session_memories[session_id]["failed_attempts"].copy()
        #返回格式:{"命令": 失败次数}
    
    async def get_session_tasks(self, session_id: str) -> List[Dict[str, Any]]:
        """获取会话的所有任务"""
        async with self._lock:  # 安全检查
            if session_id not in self.session_memories:
                return []
            
            tasks = []  
            for task_id in self.session_memories[session_id]["tasks"]:  #遍历会话的所有任务
                if task_id in self.task_registry:  #检查每个任务是否还在注册表中
                    task_info = self.task_registry[task_id].copy()  #添加步骤计数:len(self.task_steps.get(task_id, []))
                    task_info["steps_count"] = len(self.task_steps.get(task_id, []))  #.get(task_id, []):安全获取,如果不存在返回空列表
                    tasks.append(task_info)
            
            return tasks
    
    async def clear_session_memory(self, session_id: str) -> bool:
        """清除指定会话的记忆"""
        async with self._lock:
            if session_id not in self.session_memories:
                return False
            
            # 清除会话相关的任务
            for task_id in self.session_memories[session_id]["tasks"]:
                if task_id in self.task_steps:
                    del self.task_steps[task_id]
                if task_id in self.task_registry:
                    del self.task_registry[task_id]
                if task_id in self.session_task_mapping:
                    del self.session_task_mapping[task_id]
            
            # 清除会话记忆
            del self.session_memories[session_id]
            
            # 清除分析会话映射
            analysis_sessions_to_remove = [
                analysis_id for analysis_id, orig_session_id in self.analysis_session_mapping.items()
                if orig_session_id == session_id
            ]
            for analysis_id in analysis_sessions_to_remove:
                del self.analysis_session_mapping[analysis_id]
            
            logger.info(f"已清除会话 {session_id} 的所有记忆")
            return True
    
    async def get_all_sessions(self) -> List[Dict[str, Any]]:
        """获取所有会话的摘要信息"""
        async with self._lock:
            sessions = []
            for session_id, session_memory in self.session_memories.items():
                sessions.append({
                    "session_id": session_id,
                    "created_at": session_memory["created_at"],
                    "last_activity": session_memory["last_activity"],
                    "tasks_count": len(session_memory["tasks"]),
                    "compressed_memories_count": len(session_memory["compressed_memories"]),
                    "key_facts_count": len(session_memory["key_facts"]),
                    "failed_attempts_count": len(session_memory["failed_attempts"])
                })
            
            # 按最后活动时间排序
            sessions.sort(key=lambda x: x["last_activity"], reverse=True) #sessions.sort():原地排序
            #key=lambda x: x["last_activity"]:按最后活动时间排序,reverse=True:降序(最新的在前),lambda:匿名函数,简单函数的一种写法
            return sessions

#全局变量 _SCHEDULER
_SCHEDULER: Optional[_TaskScheduler] = None
'''
创建一个全局变量,类型是Optional[_TaskScheduler]
Optional 表示可以是 _TaskScheduler 或 None
初始化为 None(还没创建实例)
这是单例模式的设计:整个程序只有一个调度器实例
'''



def get_task_scheduler(config: Optional[Any] = None) -> _TaskScheduler:
    """获取全局任务调度器单例"""
    global _SCHEDULER
    if _SCHEDULER is None:
        _SCHEDULER = _TaskScheduler(config)
    return _SCHEDULER


# ========锁(Lock)的详细解释=========
'''
1.什么是并发问题?
想象一下这个场景:
你有两个朋友同时在一个Excel表格上填写数据
朋友A在B1单元格写了"100"
朋友B也在B1单元格写了"200"
最后B1单元格里的值是什么?可能是100,也可能是200,甚至可能是乱码
这就是并发冲突:多个操作同时修改同一份数据,结果不可预测。

2. 锁的类比
锁就像厕所的门锁:
一个人进去后,把门锁上
其他人想用厕所,只能在门外等
里面的人用完了,开门出来
下一个人才能进去
在代码中,锁保护共享资源(比如字典、列表):
self._lock = asyncio.Lock()  # 创建一个锁,就像装了一把门锁

async with self._lock:  # 1. 检查锁是否开着 2. 如果开着,进去并锁上门
    # 在这里安全地修改共享数据
    self.task_registry[task_id] = {...}  # 只有我能改这个字典
# 3. 自动开门(释放锁)

3. 如果没有锁会发生什么?
看这个例子:
# 两个协程同时执行这个函数
async def add_task(self, task_id):
    # 没有锁保护
    if task_id not in self.task_registry:
        # 假设执行到这里时,另一个协程也执行到了这里
        # 两个协程都认为task_id不存在
        self.task_registry[task_id] = {"status": "created"}
        # 结果:同一个task_id被创建了两次!数据混乱!
        
4. 锁的实际工作流程
async def create_task(self, task_id, purpose):
    async with self._lock:  # ① 获取锁(如果锁被占用就等待)
        # ② 进入临界区(只有持有锁的协程能执行这里)
        self.task_registry[task_id] = {
            "id": task_id,
            "purpose": purpose,
            "status": "created"
        }
        # ③ 操作完成
    # ④ 自动释放锁,其他等待的协程可以继续  
时间线示例: 
时间点1: 协程A获得锁,开始修改数据
时间点2: 协程B尝试获取锁,发现锁被占用,进入等待状态
时间点3: 协程A完成操作,释放锁
时间点4: 协程B获得锁,开始执行   

5. 为什么用 async with?
async with 是异步上下文管理器
它确保:
进入时自动获取锁
退出时自动释放锁
即使出现异常也会释放锁
相当于:
# async with self._lock: 等价于:
await self._lock.acquire()  # 获取锁
try:
    # 执行代码
finally:
    self._lock.release()    # 无论如何都释放锁
'''
# ========安全检查的详细解释==========
'''
1. 什么是安全检查?
安全检查就是在执行操作前先检查条件是否满足,防止出错。

2. 常见的需要安全检查的情况
情况1:访问不存在的键
# 错误的写法:
value = self.task_registry[task_id]  # 如果task_id不存在,会抛出KeyError

# 正确的写法(安全检查):
if task_id in self.task_registry:  # 先检查
    value = self.task_registry[task_id]  # 再访问
else:
    value = None  # 或者返回错误信息
情况2:除零错误
# 错误的写法:
result = a / b  # 如果b=0,会抛出ZeroDivisionError

# 正确的写法(安全检查):
if b != 0:  # 先检查
    result = a / b  # 再计算
else:
    result = None

3. 代码中的具体例子
clear_task_memory 方法:
async def clear_task_memory(self, task_id: str) -> bool:
    async with self._lock:  # ① 先获取锁(保证安全访问)
        if task_id in self.task_steps:  # ② 安全检查:任务是否存在?
            del self.task_steps[task_id]  # ③ 安全操作
            return True
        return False  # ④ 如果不存在,返回False
为什么需要安全检查?
如果有人试图删除一个不存在的任务
没有检查:del self.task_steps[task_id] 会抛出 KeyError
有检查:优雅地返回 False,告诉调用者"删除失败,因为任务不存在"
get_session_memory_summary 方法:
async def get_session_memory_summary(self, session_id: str) -> Dict[str, Any]:
    async with self._lock:
        if session_id not in self.session_memories:  # 安全检查
            return {"error": f"会话 {session_id} 不存在"}  # 友好错误信息
        
        # 安全检查通过,继续执行
        session_memory = self.session_memories[session_id]
        # ... 构建摘要
   
4. 安全检查的层次
第一层:存在性检查: if key in dictionary:  # 键是否存在?
第二层:类型检查:  if isinstance(value, dict):  # 值是不是字典类型?
第三层:值域检查   if 0 <= index < len(list):  # 索引是否在有效范围内?  
第四层:业务逻辑检查:   if user.has_permission("delete"):  # 用户是否有删除权限?

5. 为什么要在锁内做安全检查?
这是一个非常重要的点!看这个例子:
# ❌ 错误的写法(有竞争条件):
async def wrong_method(self, task_id):
    if task_id in self.task_steps:  # ① 检查(没有锁保护)
        # ② 这里可能有其他协程删除了task_id!
        await asyncio.sleep(0.1)  # 模拟耗时操作
        del self.task_steps[task_id]  # ③ 可能抛出KeyError!

# ✅ 正确的写法:
async def correct_method(self, task_id):
    async with self._lock:  # 获取锁
        if task_id in self.task_steps:  # 在锁的保护下检查
            # 其他协程不能修改task_steps,所以这里安全
            del self.task_steps[task_id]
关键点:从"检查"到"操作"这段时间,数据不能被别人修改。锁保证了这段时间的独占访问。
'''
# ========为什么我们会感觉安全检查与异常处理很像==========
#安全检查和异常处理确实有相似之处,但它们本质上是不同的编程思想。
'''
安全检查(主动防御)
# 主动:在错误发生前阻止它
if task_id in self.task_steps:  # 主动检查
    del self.task_steps[task_id]  # 安全执行
else:
    print("任务不存在")  # 优雅处理
    
异常处理(被动响应)
# 被动:等错误发生后再处理
try:
    del self.task_steps[task_id]  # 可能出错
except KeyError:  # 错误发生了
    print("任务不存在")  # 处理错误
'''
#例1:访问字典
'''
安全检查方式:
# 方式1:先检查再访问
if key in my_dict:
    value = my_dict[key]
else:
    value = None

# 方式2:使用get方法(也是一种安全检查)
value = my_dict.get(key, None)  # 如果key不存在,返回None

异常处理方式:
try:
    value = my_dict[key]
except KeyError:
    value = None
'''
#例2:除零操作
'''
安全检查:
if denominator != 0:  # 主动检查
    result = numerator / denominator
else:
    result = None
    
异常处理:
try:
    result = numerator / denominator
except ZeroDivisionError:
    result = None
'''
#为什么在并发代码中多用安全检查?
#1. 性能考虑
'''
异常处理有额外开销:
# Python中异常处理的代价
import time

def test_safety_check():
    start = time.time()
    for i in range(1000000):
        if i < 1000000:  # 总是为真
            pass
    return time.time() - start

def test_exception():
    start = time.time()
    for i in range(1000000):
        try:
            if i >= 1000000:  # 永远不会为真
                raise IndexError
        except IndexError:
            pass
    return time.time() - start

print(f"安全检查: {test_safety_check():.4f}秒")
print(f"异常处理: {test_exception():.4f}秒")
# 异常处理通常慢2-10倍
'''
#2. 代码清晰性
'''
# 并发代码中,安全检查更清晰
async def process(self, task_id):
    async with self._lock:
        # 一系列安全检查
        if not self._validate_task(task_id):
            return {"error": "任务无效"}
        if not self._has_permission():
            return {"error": "无权限"}
        if not self._check_resources():
            return {"error": "资源不足"}
        
        # 执行操作
        return await self._execute(task_id)
        
#如果用异常处理:
async def process(self, task_id):
    async with self._lock:
        try:
            self._validate_task(task_id)  # 可能抛异常
            self._check_permission()      # 可能抛异常
            self._check_resources()       # 可能抛异常
            return await self._execute(task_id)
        except ValidationError:
            return {"error": "任务无效"}
        except PermissionError:
            return {"error": "无权限"}
        except ResourceError:
            return {"error": "资源不足"}
'''
#3. 控制流更简单
'''
在并发代码中,异常可能导致锁的释放问题:
async def risky_method(self):
    await self._lock.acquire()  # 手动获取锁
    try:
        # 如果这里抛异常,可能无法正确释放锁
        result = await self._do_something()
        return result
    except Exception as e:
        # 处理异常
        raise
    finally:
        self._lock.release()  # 但finally块可能会执行

# 使用async with更安全,但它内部也是异常处理
async def safe_method(self):
    async with self._lock:  # 自动管理锁
        return await self._do_something()  # 任何异常都会正确释放锁
'''
#想象你在开车:
'''
安全检查就像:
出发前检查油量、轮胎、刹车 
看到红灯就停下 
限速牌前减速 
你在主动避免事故

异常处理就像:
突然爆胎了 → 控制方向盘 → 慢慢靠边停车 
刹车失灵了 → 换低速挡 → 拉手刹 
发动机故障灯亮了 → 找最近的维修厂 
你在被动应对事故

两者的关系:
好的司机会同时使用两种策略
出发前安全检查(预防)
但也要知道如何应对突发情况(处理)
'''
#并发编程中多用安全检查

#=====通用任务调度器 - 完整层级结构========
'''
通用任务调度器系统(内部视角)
├── 第1层:核心数据模型(Data Models)
│   ├── TaskStep(任务步骤)
│   │   ├── step_id: str           # 步骤唯一标识
│   │   ├── task_id: str           # 所属任务ID
│   │   ├── purpose: str           # 步骤目的
│   │   ├── content: str           # 步骤内容
│   │   ├── output: str            # 步骤输出
│   │   ├── analysis: Dict         # 分析结果
│   │   ├── timestamp: float       # 时间戳
│   │   ├── success: bool          # 是否成功
│   │   └── error: Optional[str]   # 错误信息
│   │
│   └── CompressedMemory(压缩记忆)
│       ├── memory_id: str         # 记忆唯一标识
│       ├── key_findings: List[str] # 关键发现
│       ├── failed_attempts: List[str] # 失败尝试
│       ├── current_status: str    # 当前状态
│       ├── next_steps: List[str]  # 建议步骤
│       ├── source_steps: int      # 来源步骤数
│       └── timestamp: float       # 时间戳
│
├── 第2层:核心调度引擎(_TaskScheduler)
│   ├── 初始化模块
│   │   ├── __init__(config)       # 初始化配置和数据存储
│   │   └── set_llm_config(config) # 设置AI模型配置
│   │
│   ├── 任务生命周期管理
│   │   ├── create_task()          # 创建新任务(入口点)
│   │   ├── add_task_step()        # 添加任务步骤(核心记录)
│   │   ├── get_task_status()      # 查询任务状态
│   │   ├── get_running_tasks()    # 获取运行中任务
│   │   ├── schedule_parallel_execution() # 并行执行调度
│   │   ├── clear_task_memory()    # 清除单个任务记忆
│   │   └── clear_all_memory()     # 清除所有记忆
│   │
│   ├── 智能记忆压缩模块
│   │   ├── _extract_key_facts()   # 提取关键事实(预处理)
│   │   ├── _compress_memory()     # 压缩记忆(主流程)
│   │   ├── _build_compression_prompt() # 构建AI提示
│   │   └── _call_llm_compression() # 调用AI模型
│   │
│   ├── 查询与摘要模块
│   │   ├── get_task_memory_summary()  # 获取任务记忆摘要
│   │   ├── get_global_memory_summary() # 获取全局记忆摘要
│   │   └── get_failed_attempts_summary() # 获取失败尝试摘要
│   │
│   └── 会话级别记忆管理(扩展层)
│       ├── get_session_memory_summary()    # 获取会话记忆摘要
│       ├── get_session_compressed_memories() # 获取会话压缩记忆
│       ├── get_session_key_facts()         # 获取会话关键事实
│       ├── get_session_failed_attempts()   # 获取会话失败尝试
│       ├── get_session_tasks()             # 获取会话所有任务
│       ├── clear_session_memory()          # 清除会话记忆
│       └── get_all_sessions()              # 获取所有会话
│
├── 第3层:数据存储结构
│   ├── 任务注册表(task_registry)
│   │   ├── 结构:Dict[str, Dict]
│   │   ├── 键:task_id
│   │   └── 值:任务元数据(id, purpose, status等)
│   │
│   ├── 任务步骤历史(task_steps)
│   │   ├── 结构:Dict[str, List[TaskStep]]
│   │   ├── 键:task_id
│   │   └── 值:该任务的所有步骤列表
│   │
│   ├── 压缩记忆库(compressed_memories)
│   │   ├── 结构:List[CompressedMemory]
│   │   └── 内容:所有压缩后的记忆块
│   │
│   ├── 关键事实存储(key_facts)
│   │   ├── 结构:Dict[str, str]
│   │   ├── 键:格式化的事实键
│   │   └── 值:事实内容
│   │
│   ├── 失败尝试计数(failed_attempts)
│   │   ├── 结构:Dict[str, int]
│   │   ├── 键:失败的操作内容
│   │   └── 值:失败次数
│   │
│   └── 会话级别存储(扩展)
│       ├── session_memories       # 会话记忆:session_id -> 记忆数据
│       ├── session_task_mapping   # 任务到会话的映射
│       └── analysis_session_mapping # 分析会话到原始会话的映射
│
├── 第4层:配置系统
│   ├── 来源:外部配置模块
│   ├── 配置参数:
│   │   ├── max_steps: int          # 最大保存步骤数
│   │   ├── compression_threshold: int # 压缩触发阈值
│   │   ├── keep_last_steps: int    # 压缩后保留步骤数
│   │   ├── output_summary_length: int # 输出摘要长度
│   │   ├── step_output_display_length: int # 步骤输出显示长度
│   │   ├── key_facts_compression_limit: int # 关键事实压缩限制
│   │   ├── key_facts_summary_limit: int # 关键事实摘要限制
│   │   ├── compressed_memory_summary_limit: int # 压缩记忆摘要限制
│   │   ├── key_findings_display_limit: int # 关键发现显示限制
│   │   ├── failed_attempts_display_limit: int # 失败尝试显示限制
│   │   └── compressed_memory_global_limit: int # 全局压缩记忆限制
│   │
│   └── LLM配置(可选):
│       ├── model: str              # AI模型名称
│       ├── api_key: str           # API密钥
│       └── api_base: str          # API基础地址
│
├── 第5层:并发安全机制
│   ├── 锁机制(_lock: asyncio.Lock)
│   │   ├── 作用范围:所有修改共享数据的方法
│   │   ├── 使用方式:async with self._lock:
│   │   └── 保护的数据:
│   │       ├── task_registry
│   │       ├── task_steps
│   │       ├── compressed_memories
│   │       ├── key_facts
│   │       ├── failed_attempts
│   │       └── 所有会话相关数据
│   │
│   └── 安全检查策略
│       ├── 存在性检查:if key in dictionary
│       ├── 类型检查:isinstance()
│       ├── 边界检查:len()和索引范围
│       └── 业务逻辑检查:自定义验证
│
├── 第6层:日志与监控
│   ├── 日志记录器(logger)
│   ├── 关键日志点:
│   │   ├── 任务创建/完成
│   │   ├── 记忆压缩开始/结束
│   │   ├── AI调用成功/失败
│   │   ├── 清理操作
│   │   └── 错误和异常
│   │
│   └── 监控指标:
│       ├── 任务数量统计
│       ├── 步骤总数
│       ├── 压缩记忆块数
│       ├── 关键事实数
│       └── 失败尝试数
│
└── 第7层:外部依赖与集成
    ├── 异步运行时:asyncio
    ├── AI模型调用:litellm(或类似库)
    ├── 唯一标识生成:uuid
    ├── 时间处理:time, datetime
    ├── 数据序列化:json
    └── 类型注解支持:typing
'''

# ========API接口映射(外部视角)=========
'''
API层(假设有Web封装)
├── 任务级别接口
│   ├── POST /tasks                      → create_task()
│   ├── GET /tasks                       → 获取所有任务(需要额外实现)
│   ├── GET /tasks/{task_id}             → get_task_status()
│   ├── GET /tasks/{task_id}/memory      → get_task_memory_summary()
│   ├── POST /tasks/{task_id}/steps      → add_task_step()
│   └── DELETE /tasks/{task_id}/memory   → clear_task_memory()
│
├── 并行执行接口
│   └── POST /tasks/parallel             → schedule_parallel_execution()
│
├── 会话级别接口
│   ├── GET /sessions                    → get_all_sessions()
│   ├── GET /sessions/{session_id}       → get_session_memory_summary()
│   ├── GET /sessions/{session_id}/compressed_memories → get_session_compressed_memories()
│   ├── GET /sessions/{session_id}/key_facts → get_session_key_facts()
│   ├── GET /sessions/{session_id}/failed_attempts → get_session_failed_attempts()
│   ├── GET /sessions/{session_id}/tasks → get_session_tasks()
│   └── DELETE /sessions/{session_id}    → clear_session_memory()
│
└── 全局级别接口
    ├── GET /memory/global               → get_global_memory_summary()
    ├── GET /memory/failed-attempts      → get_failed_attempts_summary()
    ├── GET /memory/running-tasks        → get_running_tasks()
    └── DELETE /memory/global            → clear_all_memory()
'''

# ========映射关系:从代码到API==========
# 让我展示它们之间的映射关系:
'''
代码方法                                 → RESTful接口
──────────────────
create_task(task_id, purpose)            → POST /tasks
                                           Body: {task_id, purpose}
                                           
get_task_status(task_id)                 → GET /tasks/{task_id}
                                           
add_task_step(task_id, step)             → POST /tasks/{task_id}/steps
                                           Body: {step数据}
                                           
get_task_memory_summary(task_id)         → GET /tasks/{task_id}/memory
                                           
schedule_parallel_execution(tasks)       → POST /tasks/parallel
                                           Body: {任务列表}
                                           
get_all_sessions()                       → GET /sessions
                                           
get_session_memory_summary(session_id)   → GET /sessions/{session_id}
                                           
clear_all_memory()                       → DELETE /memory/global
'''

toolkit_manager.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
#!/usr/bin/env python3 - 告诉电脑用python3来运行这个文件
# -*- coding: utf-8 -*- - 告诉电脑这个文件使用UTF-8编码(可以支持中文)
工具包管理器 - 管理agentserver的工具包
"""

import yaml
import logging
from pathlib import Path
from typing import Dict, Any, List, Optional

from .tools import FileEditToolkit, AsyncBaseToolkit, ToolkitConfig
'''
这些导入都是什么:
  yaml - 用来读取YAML格式的配置文件(YAML是一种写配置文件的格式)
  logging - 用来记录程序运行时的信息(比如错误、警告)
  Path - 用来处理文件路径(比如找到某个文件夹)
  typing 里的东西 - 只是用来告诉电脑数据类型,不会真的做什么
  从当前文件夹的tools文件里导入三个东西:
     FileEditToolkit - 文件编辑工具包
     AsyncBaseToolkit - 所有工具包的基础类
     ToolkitConfig - 工具包的配置
'''

#日志记录器
logger = logging.getLogger(__name__)
#这是什么:创建一个logger(记录器),用来写日志,__name__ 会自动变成这个文件的名字

class ToolkitManager:
    """工具包管理器"""
    
    #1. 初始化方法 __init__
    def __init__(self, config_dir: str = "agentserver/configs"):
        self.config_dir = Path(config_dir)
        self.toolkits: Dict[str, AsyncBaseToolkit] = {}
        self.toolkit_configs: Dict[str, Dict[str, Any]] = {}
        
        # 注册可用的工具包类型
        self.toolkit_types = {
            "file_edit": FileEditToolkit,
        }
        
        # 加载配置
        self._load_configs()
        '''
        这个方法是做什么的:
             当创建一个ToolkitManager对象时,这个方法会自动运行
            它做了4件事:
               1.设置配置文件夹路径
               2.创建两个空字典(就像两个空盒子):
                    toolkits:放已经创建好的工具包
                    toolkit_configs:放从配置文件读取的配置信息
               3.注册可用的工具包类型(现在只有一种:file_edit)
               4.调用_load_configs()方法加载配置
        '''
    #加载配置方法
    def _load_configs(self):
        """加载工具包配置"""
        #1.检查文件夹是否存在
        if not self.config_dir.exists():  #先检查配置文件夹是否存在
            logger.warning(f"配置目录不存在: {self.config_dir}")  #如果不存在,记录警告信息,然后直接返回(不继续执行)
            return
         
         #2.遍历所有YAML文件   
        for config_file in self.config_dir.glob("*.yaml"):  #找到文件夹里所有以.yaml结尾的文件,对每个文件都做下面的操作
            try:
                #3.读取文件内容
                with open(config_file, 'r', encoding='utf-8') as f: #打开文件('r'表示只读)
                    config = yaml.safe_load(f)                      #用yaml库读取内容,转成Python能理解的数据
                #保存配置信息
                toolkit_name = config.get('name', config_file.stem)  #获取配置里的名字,如果没有就用文件名(去掉.yaml的部分)
                self.toolkit_configs[toolkit_name] = config          #把配置保存到toolkit_configs字典里
                logger.info(f"加载工具包配置: {toolkit_name}")
            #5.错误处理
            except Exception as e:
                logger.error(f"加载配置文件失败 {config_file}: {e}")   #如果上面任何一步出错了,就记录错误信息,但程序不会崩溃
'''
这个代码文件创建了一个工具包管理器,它:从指定文件夹读取配置文件(YAML格式),管理不同类型的工具包,目前只支持一种工具包:文件编辑工具包
它里面有哪些函数:__init__ - 初始化函数(自动运行),_load_configs - 私有方法(内部使用),加载配置
用了哪些外部函数:Path.exists() - 检查路径是否存在,Path.glob() - 查找匹配的文件,open() - 打开文件,yaml.safe_load() - 安全地读取YAML文件,各种日志函数:warning(), info(), error()
'''
    
    def get_toolkit(self, name: str) -> Optional[AsyncBaseToolkit]:
        #这个方法是做什么的:根据名字获取一个工具包,如果工具包已经存在,直接返回,如果不存在,根据配置创建一个新的
        """获取工具包实例"""
        #1.检查是否已经创建过
        if name in self.toolkits:         #先检查self.toolkits字典里有没有这个工具包
            return self.toolkits[name]    #如果有,直接返回(不再创建新的)
        
        #检查配置是否存在    
        if name not in self.toolkit_configs:   #如果配置里也没有这个工具包,记录错误并返回None
            logger.error(f"工具包配置不存在: {name}")  
            return None   
            
        #3.获取配置信息
        config_data = self.toolkit_configs[name]             #从配置字典里取出这个工具包的配置
        toolkit_type = config_data.get('mode', 'builtin')    #获取工具包的模式(默认是'builtin',也就是内置的)
        
        #4.创建内置工具包
        if toolkit_type == 'builtin':                       #如果是内置模式,从toolkit_types字典里找到对应的类
            toolkit_class = self.toolkit_types.get(name)
            if not toolkit_class:                           #如果找不到,记录错误并返回
                logger.error(f"未找到工具包类型: {name}")
                return None
             
            #5.创建配置对象   
            toolkit_config = ToolkitConfig(                #创建一个ToolkitConfig配置对象
                config=config_data.get('config', {}),      #创建一个ToolkitConfig配置对象
                name=name
            )
            #设置激活的工具列表
            toolkit_config.activated_tools = config_data.get('activated_tools')
            
            #6.创建工具包并保存
            toolkit = toolkit_class(toolkit_config)       #用配置对象创建工具包实例
            self.toolkits[name] = toolkit                 #保存到self.toolkits字典里(下次就不用再创建了)
            logger.info(f"创建工具包实例: {name}")           #记录日志并返回
            return toolkit
        #处理其他模式
        else:
            logger.error(f"不支持的工具包模式: {toolkit_type}")   #如果不是内置模式,记录错误并返回
            return None
    
    def get_all_tools(self) -> List[Dict[str, Any]]:     #获取所有工具包中的所有工具
        """获取所有工具包的工具列表"""
        all_tools = []             #创建一个空列表
        for toolkit_name in self.toolkit_configs.keys():        #遍历所有工具包,遍历配置字典中的所有工具包名字
            toolkit = self.get_toolkit(toolkit_name)     #获取工具包实例,用刚才的get_toolkit方法获取工具包,如果获取成功,继续执行
            if toolkit:
                tools = toolkit.get_tools_list()      #调用工具包的get_tools_list()方法获取工具列表
                for tool in tools:
                    tool['toolkit'] = toolkit_name    #给每个工具添加一个toolkit字段,记录它是哪个工具包的
                all_tools.extend(tools)               #把这些工具添加到总列表中
        return all_tools              #返回一个包含所有工具的列表
    
    async def call_tool(self, toolkit_name: str, tool_name: str, arguments: Dict[str, Any]) -> str:
        #这个方法做什么:调用一个具体的工具,这是一个异步方法(注意async和await)
        """调用工具(异步方法)"""
        #1.获取工具包
        toolkit = self.get_toolkit(toolkit_name)   #先获取工具包实例
        if not toolkit:
            return f"工具包不存在: {toolkit_name}"   #如果没有,返回错误信息
         
        #尝试调用工具   
        try:
            return await toolkit.call_tool(tool_name, arguments)   #尝试异步调用工具包的call_tool方法,传入工具名字和参数
        #3.错误处理
        except Exception as e:   #如果调用失败,记录错误日志
            logger.error(f"调用工具失败 {toolkit_name}.{tool_name}: {e}")  
            return f"调用工具失败: {str(e)}"   #返回错误信息
    
    def list_toolkits(self) -> List[str]:
        #这个方法做什么:列出所有可用的工具包名字,非常简单,就是把配置字典的所有键(key)转成列表
        """列出所有可用的工具包"""
        return list(self.toolkit_configs.keys())
    
    def get_toolkit_info(self, name: str) -> Optional[Dict[str, Any]]:
        """获取工具包详细信息"""
        #1.检查配置是否存在
        if name not in self.toolkit_configs:
            return None    #如果配置里没有这个工具包,返回None
            
        #2.获取配置和工具包
        config = self.toolkit_configs[name]
        toolkit = self.get_toolkit(name)
        
        #3.创建基本信息字典
        info = {
            "name": name,
            "mode": config.get('mode', 'builtin'),
            "config": config.get('config', {}),
            "tools": []
        }
        
        #4.添加工具列表(如果有)
        if toolkit:
            info["tools"] = toolkit.get_tools_list()
        
        #5.返回信息    
        return info


# 全局工具包管理器实例
toolkit_manager = ToolkitManager()

# =====完整的层级结构图=====
'''
工具包管理器 (ToolkitManager)
├── 初始化配置
│   ├── 设置配置目录路径
│   ├── 初始化存储容器
│   │   ├── 工具包实例字典 (toolkits)
│   │   └── 工具包配置字典 (toolkit_configs)
│   ├── 注册可用工具包类型
│   │   └── file_edit → FileEditToolkit
│   └── 自动加载配置 (_load_configs)
│       ├── 检查配置目录是否存在
│       ├── 遍历所有YAML配置文件
│       ├── 读取并解析YAML文件
│       └── 存储配置到字典
│
├── 工具包实例管理
│   ├── 获取工具包 (get_toolkit)
│   │   ├── 检查是否已实例化 (缓存检查)
│   │   ├── 检查配置是否存在
│   │   ├── 创建工具包流程
│   │   │   ├── 获取配置数据
│   │   │   ├── 确定工具包类型
│   │   │   ├── 创建配置对象 (ToolkitConfig)
│   │   │   │   ├── 设置基础配置
│   │   │   │   └── 设置激活工具列表
│   │   │   ├── 实例化工具包类
│   │   │   └── 缓存实例供下次使用
│   │   └── 错误处理
│   │       ├── 配置不存在错误
│   │       ├── 类型未找到错误
│   │       └── 模式不支持错误
│   │
│   ├── 列出所有工具包 (list_toolkits)
│   │   └── 返回配置字典的所有键
│   │
│   └── 获取工具包信息 (get_toolkit_info)
│       ├── 检查配置存在性
│       ├── 构建信息字典
│       │   ├── 基本信息
│       │   │   ├── 名称 (name)
│       │   │   ├── 模式 (mode)
│       │   │   └── 配置 (config)
│       │   └── 工具列表
│       │       └── 从工具包实例获取工具列表
│       └── 返回完整信息
│
├── 工具管理
│   ├── 获取所有工具 (get_all_tools)
│   │   ├── 遍历所有工具包配置
│   │   ├── 获取每个工具包的工具列表
│   │   ├── 标记工具所属工具包
│   │   │   └── 添加 toolkit 字段
│   │   └── 合并所有工具到列表
│   │
│   └── 调用工具 (call_tool) [异步]
│       ├── 获取工具包实例
│       ├── 调用具体工具
│       │   ├── 异步调用工具包方法
│       │   │   └── await toolkit.call_tool()
│       │   └── 传入参数
│       │       ├── 工具名称 (tool_name)
│       │       └── 参数字典 (arguments)
│       └── 错误处理
│           ├── 工具包不存在错误
│           └── 调用失败错误
│
├── 外部依赖
│   ├── 文件操作
│   │   ├── Path (路径处理)
│   │   │   ├── exists() - 检查存在
│   │   │   └── glob() - 模式匹配文件
│   │   └── open() - 文件读取
│   │
│   ├── 配置解析
│   │   └── yaml.safe_load() - YAML解析
│   │
│   ├── 日志系统
│   │   ├── logger.info() - 信息记录
│   │   ├── logger.warning() - 警告记录
│   │   └── logger.error() - 错误记录
│   │
│   └── 类型系统
│       ├── Dict - 字典类型
│       ├── List - 列表类型
│       ├── Any - 任意类型
│       └── Optional - 可选类型
│
└── 全局访问
    └── 全局管理器实例 (toolkit_manager)
        └── 单例模式,供其他模块导入使用
'''

config.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Agent Server配置文件 - 重构版
提供客观、实用的配置管理
"""

from dataclasses import dataclass   #dataclass:一个方便的工具,可以快速创建数据类
from typing import Optional         #typing.Optional:一个类型提示工具(这里实际没用到)

# ============ 服务器配置 ============

# 从主配置读取端口
try:       #try-except:逻辑判断,意思是“试试看,不行就用备选方案”
    from system.config import get_server_port             #先尝试从其他地方获取端口号
    AGENT_SERVER_PORT = get_server_port("agent_server")
except ImportError:
    AGENT_SERVER_PORT = 8001  # 回退默认值                 #如果找不到(ImportError),就用默认的 8001

# ============ 任务调度器配置 ============

@dataclass
class TaskSchedulerConfig:          #创建了一个叫“任务调度器配置”的设置类
    """任务调度器配置"""
    # 记忆管理阈值
    max_steps: int = 15                    # 最大保存步骤数
    compression_threshold: int = 7          # 压缩触发阈值
    keep_last_steps: int = 4               # 压缩后保留的详细步骤数
    
    # 显示相关阈值
    key_facts_compression_limit: int = 5    # 压缩提示中的关键事实数量
    key_facts_summary_limit: int = 10       # 摘要中的关键事实数量
    compressed_memory_summary_limit: int = 3 # 任务摘要中的压缩记忆数量
    compressed_memory_global_limit: int = 2  # 全局摘要中的压缩记忆数量
    key_findings_display_limit: int = 3     # 关键发现显示数量
    failed_attempts_display_limit: int = 3  # 失败尝试显示数量
    
    # 输出长度限制
    output_summary_length: int = 256        # 关键事实中的输出摘要长度
    step_output_display_length: int = 512   # 步骤显示中的输出长度
    
    # 性能配置
    enable_auto_compression: bool = True    # 是否启用自动压缩
    compression_timeout: int = 30           # 压缩超时时间(秒)
    max_compression_retries: int = 3        # 最大压缩重试次数

# 默认任务调度器配置实例
DEFAULT_TASK_SCHEDULER_CONFIG = TaskSchedulerConfig()
#用上面的配置类创建了一个实际可用的配置对象,就像用模具做了一个具体的蛋糕

# ============ Agent管理器配置 ============

@dataclass
class AgentManagerConfig:                   #AgentManagerConfig:管理代理(agent)的设置
    """Agent管理器配置"""
    # 会话管理
    default_session_timeout: int = 3600     # 默认会话超时时间(秒)
    max_session_history: int = 100          # 最大会话历史记录数
    
    # 任务执行
    max_concurrent_agents: int = 5          # 最大并发Agent数
    agent_timeout: int = 300                # Agent执行超时时间(秒)
    
    # 缓存配置
    enable_agent_cache: bool = True         # 是否启用Agent缓存
    cache_ttl: int = 1800                   # 缓存生存时间(秒)

# 默认Agent管理器配置实例
DEFAULT_AGENT_MANAGER_CONFIG = AgentManagerConfig()

# ============ 电脑控制Agent配置 ============

@dataclass  
class ComputerControlConfig:                #ComputerControlConfig:控制电脑的设置
    """电脑控制Agent配置"""
    # 执行配置
    max_execution_time: int = 60            # 最大执行时间(秒)
    enable_screenshot: bool = True          # 是否启用截图功能
    screenshot_quality: int = 80            # 截图质量(1-100)
    
    # 安全配置
    enable_safety_checks: bool = True       # 是否启用安全检查
    blocked_commands: list = None           # 被阻止的命令列表
    
    def __post_init__(self):
        if self.blocked_commands is None:
            self.blocked_commands = [
                "rm -rf /", "format c:", "del /f /s /q c:\\",
                "shutdown", "reboot", "halt"
            ]

# 默认电脑控制配置实例
DEFAULT_COMPUTER_CONTROL_CONFIG = ComputerControlConfig()

# ============ 全局配置管理 ============

@dataclass
class AgentServerConfig:        #这是总的配置,包含所有子配置
    """Agent服务器全局配置"""
    # 服务器配置
    host: str = "0.0.0.0"
    port: int = None
    
    # 子模块配置
    task_scheduler: TaskSchedulerConfig = None
    agent_manager: AgentManagerConfig = None
    computer_control: ComputerControlConfig = None
    
    # 日志配置
    log_level: str = "INFO"
    enable_debug_logs: bool = False
    
    def __post_init__(self):            #这是一个特殊函数,会在类创建后自动运行
        # 设置默认端口
        if self.port is None:
            self.port = AGENT_SERVER_PORT
        
        # 设置默认子配置
        if self.task_scheduler is None:
            self.task_scheduler = DEFAULT_TASK_SCHEDULER_CONFIG
        if self.agent_manager is None:
            self.agent_manager = DEFAULT_AGENT_MANAGER_CONFIG
        if self.computer_control is None:
            self.computer_control = DEFAULT_COMPUTER_CONTROL_CONFIG

# 全局配置实例
config = AgentServerConfig()

# ============ 配置访问函数 ============

def get_task_scheduler_config() -> TaskSchedulerConfig:
    """获取任务调度器配置"""
    return config.task_scheduler

def get_agent_manager_config() -> AgentManagerConfig:
    """获取Agent管理器配置"""
    return config.agent_manager

def get_computer_control_config() -> ComputerControlConfig:
    """获取电脑控制配置"""
    return config.computer_control

def update_config(**kwargs):
    """更新配置"""
    for key, value in kwargs.items():
        if hasattr(config, key):
            setattr(config, key, value)
        else:
            raise ValueError(f"未知配置项: {key}")

# ============ 向后兼容 ============

# 保持向后兼容的配置常量
AGENT_SERVER_HOST = config.host
AGENT_SERVER_PORT = config.port
'''
代码最后创建了几个函数:
  get_task_scheduler_config():获取任务调度器的配置
  get_agent_manager_config():获取代理管理器的配置
  get_computer_control_config():获取电脑控制的配置
  update_config(**kwargs):更新配置的函数,可以修改设置
'''

README.md

一、 二次开发核心入口

1. 添加新的意图处理逻辑

  • 修改文件task_planner.py
  • 关键函数evaluate_and_plan()
  • 添加新任务类型
# 在task_planner.py中添加新任务类型判断
if "你的新意图关键词" in user_query:
    return {
        "type": "your_new_processor",
        "params": {...},
        "executor": "agent"  # 或 "mcp"
    }

2. 扩展任务执行器

  • 修改文件task_scheduler.py
  • 添加新的processor
async def _execute_task(self, task):
    if task["type"] == "your_new_processor":
        return await self._execute_your_processor(task["params"])
    
async def _execute_your_processor(self, params):
    # 实现你的业务逻辑
    pass

3. 自定义API接口

  • 修改文件agent_server.py
  • 添加新端点
@app.post("/your_custom_endpoint")
async def custom_handler(request_data: dict):
    # 直接调用任务规划器
    plan = await task_planner.evaluate_and_plan(request_data["query"])
    # 调度执行
    result = await task_scheduler.schedule_execution(plan)
    return result

二、 单独调试步骤

1. 启动独立测试服务

# 进入项目目录
cd agentserver

# 方式1:直接运行(查看详细日志)
python -m uvicorn agent_server:app --reload --port 8001 --log-level debug

# 方式2:使用提供的脚本(如果有)
python dev_server.py

2. 测试API接口

# 测试意图分析(使用curl)
curl -X POST http://localhost:8001/analyze_and_execute \
  -H "Content-Type: application/json" \
  -d '{
    "messages": [{"role": "user", "content": "帮我打开计算器"}],
    "session_id": "test_session"
  }'

# 查看任务状态
curl http://localhost:8001/tasks?session_id=test_session

3. 单元调试模块

# 创建测试脚本 test_debug.py
import asyncio
from task_planner import TaskPlanner
from task_scheduler import TaskScheduler

async def test_planner():
    planner = TaskPlanner()
    # 直接测试规划器
    plan = await planner.evaluate_and_plan("打开记事本和浏览器")
    print("生成的计划:", plan)
    
async def test_scheduler():
    scheduler = TaskScheduler()
    # 测试直接调度
    task = {"type": "processor", "params": {"query": "测试"}}
    result = await scheduler.schedule_execution(task)
    print("执行结果:", result)

if __name__ == "__main__":
    asyncio.run(test_planner())

4. 修改配置快速生效

  • 配置文件config.py
  • 常用修改项
# 关闭某些功能进行调试
INTENT_ANALYSIS_ENABLED = False  # 跳过意图分析直接测试
TASK_TIMEOUT = 30  # 调小超时时间快速失败
MAX_CONCURRENT_TASKS = 1  # 限制并发数便于调试

三、 核心开发场景

场景1:添加新的电脑控制能力

  1. 在 task_scheduler.py 中添加执行函数
  2. 在 task_planner.py 中注册意图映射
  3. 测试:直接调用新函数验证功能

场景2:修改任务调度策略

# 修改调度逻辑(task_scheduler.py)
class CustomTaskScheduler(TaskScheduler):
    async def schedule_parallel_execution(self, tasks):
        # 重写并发策略
        # 例如:按优先级排序、依赖关系处理
        pass

场景3:集成新的AI模型

# 替换或扩展意图分析模型
# 在task_planner.py中修改
async def analyze_intent(self, query):
    # 使用新的LLM API
    response = await call_your_llm_api(query)
    return self._parse_to_task(response)

四、 调试技巧

1. 日志追踪

# 在代码中添加调试日志
import logging
logger = logging.getLogger(__name__)

logger.debug(f"任务参数: {params}")  # 开发时用debug级别
logger.info(f"开始执行任务: {task_id}")

2. 断点调试

# 使用debugpy进行远程调试
python -m debugpy --listen 0.0.0.0:5678 --wait-for-client agent_server.py

3. 测试数据模拟

# 创建模拟请求进行测试
test_data = {
    "messages": [{"role": "user", "content": "测试指令"}],
    "session_id": "debug_" + str(uuid.uuid4())
}
# 直接调用内部函数,绕过HTTP层
result = await app.state.planner.evaluate_and_plan(test_data["messages"][0]["content"])

五、 快速验证清单

  1. 修改生效了吗?
    • 重启服务:Ctrl+C 然后重新启动
    • 检查日志:确认没有导入错误
  2. 接口工作正常吗?
    • # 健康检查:curl http://localhost:8001/health
  3. 任务执行成功吗?
    • 查看任务状态接口
    • 检查任务注册表:task_scheduler.task_registry
  4. 需要回滚吗?
    • 备份原文件:cp agent_server.py agent_server.py.bak
    • 使用Git临时提交:git stash

六、 重要提醒

  • 热重载:启动时加 --reload 参数,修改代码自动重启
  • 端口冲突:确保8001端口未被占用,或修改 config.py 中的端口
  • 依赖安装:如果添加新依赖,更新 requirements.txt
  • 并发安全:修改调度器时注意线程/协程安全问题

最简开发循环

  1. 修改 task_planner.py 或 task_scheduler.py
  2. 重启服务:python -m uvicorn agent_server:app --reload --port 8001
  3. 发送测试请求:curl -X POST ...
  4. 查看日志确认结果

直接改这几个核心文件,就能实现大部分二次开发需求。

文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇