本文最后更新于63 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com
1
agent_server.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
第一行:告诉电脑“请用python3来运行这个文件”
第二行:告诉电脑“这个文件用的是UTF-8编码”(这样就能显示中文了)
NagaAgent独立服务 - 基于博弈论的电脑控制智能体
提供意图识别和电脑控制任务执行功能
"""
import asyncio # 处理“异步”任务(可以同时做多件事)
import uuid # 生成唯一的ID号码(像身份证号一样)
import logging # 记录程序运行日志(就像写日记)
from typing import Dict, Any, Optional, List # 定义数据类型
from datetime import datetime
from fastapi import FastAPI, HTTPException # 创建Web服务的工具
from fastapi.responses import JSONResponse # 返回JSON格式数据
from contextlib import asynccontextmanager # 管理程序启动和关闭
from system.config import config # 配置文件
from system.background_analyzer import get_background_analyzer # 意图分析器
from agentserver.agent_computer_control import ComputerControlAgent # 电脑控制核心
from agentserver.task_scheduler import get_task_scheduler, TaskStep # 任务安排器
from agentserver.toolkit_manager import toolkit_manager # 工具管理器
# 配置日志
#格式解释:时间 - 名字 - 等级 - 消息,2024-01-01 10:00:00 - __main__ - INFO - 服务启动成功
logger = logging.getLogger(__name__) # 创建一个“日记本”
logger.setLevel(logging.INFO) # 设置记录级别(INFO级)
if not logger.handlers: # 如果还没有写日记的工具,就创建一个
handler = logging.StreamHandler() # 创建一个往控制台输出的工具
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 日记格式
handler.setFormatter(formatter) # 应用格式
logger.addHandler(handler) # 把工具给日记本
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI应用生命周期"""
# startup
try:
# 初始化意图分析器(理解你想做什么)
Modules.analyzer = get_background_analyzer()
# 初始化电脑控制智能体 (实际控制电脑)
Modules.computer_control = ComputerControlAgent()
# 初始化任务调度器 (安排任务顺序)
Modules.task_scheduler = get_task_scheduler()
# 设置LLM配置用于智能压缩,设置AI大脑配置
if hasattr(config, 'api') and config.api:
llm_config = {
"model": config.api.model, # AI模型名字
"api_key": config.api.api_key, # 钥匙
"api_base": config.api.base_url # 服务器地址
}
Modules.task_scheduler.set_llm_config(llm_config)
logger.info("NagaAgent电脑控制服务初始化完成")
except Exception as e:
logger.error(f"服务初始化失败: {e}")
raise
# 运行期
yield # 程序正常运行中
# shutdown
try:
logger.info("NagaAgent电脑控制服务已关闭")
except Exception as e:
logger.error(f"服务关闭失败: {e}")
#创建Web服务,创建一个叫app的Web服务器,名字:NagaAgent Computer Control Server,版本:1.0.0,使用刚才定义的lifespan管理开关机
app = FastAPI(title="NagaAgent Computer Control Server", version="1.0.0", lifespan=lifespan)
class Modules:
"""全局模块管理器"""
analyzer = None # 意图分析器(理解你要什么)
computer_control = None # 电脑控制器(实际干活)
task_scheduler = None # 任务安排器(安排顺序)
#作用:获取现在的时间,格式化成标准字符串,例子:"2024-01-01T10:00:00.123456"
def _now_iso() -> str:
"""获取当前时间ISO格式"""
return datetime.now().isoformat()
#处理电脑控制任务(核心功能),这个函数是核心,它:
#1.接收一个指令(比如“打开记事本”)2.调用电脑控制智能体执行3.返回执行结果
async def _process_computer_control_task(instruction: str, session_id: Optional[str] = None) -> Dict[str, Any]:
"""处理电脑控制任务"""
try: # 1. 记录开始执行
logger.info(f"开始处理电脑控制任务: {instruction}")
# 2. 直接调用电脑控制智能体
result = await Modules.computer_control.handle_handoff({
"action": "automate_task", # 动作:自动执行任务
"target": instruction, # 目标:用户给的指令
"parameters": {} # 额外参数(这里为空)
})
# 3. 记录完成,返回成功结果
logger.info(f"电脑控制任务完成: {instruction}")
return {
"success": True, # 成功了吗?True=成功
"result": result, # 具体结果
"task_type": "computer_control", # 任务类型
"instruction": instruction # 原指令
}
except Exception as e: # 如果出错了
# 4. 记录错误,返回失败结果
logger.error(f"电脑控制任务失败: {e}")
return {
"success": False, # 成功了吗?False=失败
"error": str(e), # 错误信息
"task_type": "computer_control",
"instruction": instruction
}
# 异步执行多个任务,这个函数处理多个任务:
#agent_calls: 任务列表(多个要执行的任务),session_id: 会话ID(区分不同用户)
#analysis_session_id: 分析会话ID,request_id: 请求ID(每个请求唯一编号),callback_url: 回调地址(完成后通知谁)
async def _execute_agent_tasks_async(agent_calls: List[Dict[str, Any]], session_id: str,
analysis_session_id: str, request_id: str, callback_url: Optional[str] = None):
"""异步执行Agent任务 - 应用与MCP服务器相同的会话管理逻辑"""
try:
logger.info(f"[异步执行] 开始执行 {len(agent_calls)} 个Agent任务")
# 处理每个Agent任务
results = []
# 1. 循环处理每个任务
for i, agent_call in enumerate(agent_calls): # 取出任务信息
try:
instruction = agent_call.get("instruction", "") # 指令
tool_name = agent_call.get("tool_name", "未知工具") # 工具名
service_name = agent_call.get("service_name", "未知服务") # 服务名
logger.info(f"[异步执行] 执行任务 {i+1}/{len(agent_calls)}: {tool_name} - {instruction}")
# 添加任务步骤到调度器
# 2. 记录步骤到任务调度器(就像写待办事项)
await Modules.task_scheduler.add_task_step(request_id, TaskStep(
step_id=f"step_{i+1}",
task_id=request_id,
purpose=f"执行Agent任务: {tool_name}",
content=instruction,
output="",
analysis=None,
success=True
))
#3.执行电脑控制任务
result = await _process_computer_control_task(instruction, session_id)
results.append({
"agent_call": agent_call,
"result": result,
"step_index": i
})
# 4.更新任务步骤结果
await Modules.task_scheduler.add_task_step(request_id, TaskStep(
step_id=f"step_{i+1}_result",
task_id=request_id,
purpose=f"任务结果: {tool_name}",
content=f"执行结果: {result.get('success', False)}",
output=str(result.get('result', '')),
analysis={"analysis": f"任务类型: {result.get('task_type', 'unknown')}, 工具: {tool_name}, 服务: {service_name}"},
success=result.get('success', False),
error=result.get('error')
))
logger.info(f"[异步执行] 任务 {i+1} 完成: {result.get('success', False)}")
except Exception as e:
logger.error(f"[异步执行] 任务 {i+1} 执行失败: {e}")
results.append({
"agent_call": agent_call,
"result": {"success": False, "error": str(e)},
"step_index": i
})
'''循环过程:
任务1: "打开浏览器" → 执行 → 记录结果
任务2: "搜索Python教程" → 执行 → 记录结果
任务3: "保存网页" → 执行 → 记录结果
'''
# 发送回调通知(如果提供了回调URL)
if callback_url:
await _send_callback_notification(callback_url, request_id, session_id, analysis_session_id, results)
logger.info(f"[异步执行] 所有Agent任务执行完成: {len(results)} 个任务")
except Exception as e:
logger.error(f"[异步执行] Agent任务执行失败: {e}")
# 发送错误回调
if callback_url:
await _send_callback_notification(callback_url, request_id, session_id, analysis_session_id, [], str(e))
# 发送回调通知,这个函数在任务完成后通知别人:
'''
发送的内容:
{
"request_id": "123", // 请求ID
"session_id": "abc", // 会话ID
"analysis_session_id": "xyz", // 分析会话ID
"success": true, // 是否成功
"error": null, // 错误信息(如果没有就是null)
"results": [...], // 所有任务结果
"completed_at": "2024-01-01T10:00:00" // 完成时间
}
'''
async def _send_callback_notification(callback_url: str, request_id: str, session_id: str,
analysis_session_id: str, results: List[Dict[str, Any]], error: Optional[str] = None):
"""发送回调通知 - 应用与MCP服务器相同的回调机制"""
try:
import httpx
callback_payload = {
"request_id": request_id,
"session_id": session_id,
"analysis_session_id": analysis_session_id,
"success": error is None,
"error": error,
"results": results,
"completed_at": _now_iso()
}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(callback_url, json=callback_payload)
if response.status_code == 200:
logger.info(f"[回调通知] Agent任务结果回调成功: {request_id}")
else:
logger.error(f"[回调通知] Agent任务结果回调失败: {response.status_code}")
except Exception as e:
logger.error(f"[回调通知] 发送Agent任务回调失败: {e}")
# ============ API端点 ============
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy", # 状态:健康
"timestamp": _now_iso(), # 当前时间
"modules": { # 各个工具是否准备好
"analyzer": Modules.analyzer is not None, # 分析器准备好了吗?
"computer_control": Modules.computer_control is not None # 电脑控制器准备好了吗?
}
}
'''
访问方式:
网址:http://服务器地址/health
方法:GET(查看)
返回示例:
{
"status": "healthy",
"timestamp": "2024-01-01T10:00:00",
"modules": {
"analyzer": true, // true表示准备好了
"computer_control": true
}
}
'''
@app.post("/schedule")
async def schedule_agent_tasks(payload: Dict[str, Any]):
"""统一的任务调度端点(主要入口) - 应用与MCP服务器相同的会话管理逻辑"""
# 1. 先检查工具是否准备好
if not Modules.computer_control or not Modules.task_scheduler:
raise HTTPException(503, "电脑控制智能体或任务调度器未就绪")
# 提取新的请求格式参数
query = payload.get("query", "") # 查询内容
agent_calls = payload.get("agent_calls", []) # 要执行的任务列表
session_id = payload.get("session_id") # 会话ID(哪个顾客)
analysis_session_id = payload.get("analysis_session_id") # 分析会话ID
request_id = payload.get("request_id", str(uuid.uuid4())) # 请求ID,如果没有就生成一个
callback_url = payload.get("callback_url") # 完成后通知的地址
try:
logger.info(f"[统一调度] 接收Agent任务调度请求: {query}")
logger.info(f"[统一调度] 会话ID: {session_id}, 分析会话ID: {analysis_session_id}, 请求ID: {request_id}")
if not agent_calls: # 如果任务列表是空的
return {
"success": True,
"status": "no_tasks", # 状态:没有任务
"message": "未发现可执行的Agent任务",
"task_id": request_id,
"accepted_at": _now_iso(),
"session_id": session_id,
"analysis_session_id": analysis_session_id
}
logger.info(f"[统一调度] 会话 {session_id} 发现 {len(agent_calls)} 个Agent任务")
# 1.有任务,创建任务调度器任务
task_id = await Modules.task_scheduler.create_task(
task_id=request_id, # 任务ID
purpose=f"执行Agent任务: {query}", # 任务目的
session_id=session_id, # 会话ID
analysis_session_id=analysis_session_id # 分析会话ID
)
# 2.异步执行任务(不阻塞响应)
asyncio.create_task(_execute_agent_tasks_async( #asyncio.create_task():创建后台任务,不阻塞当前请求
agent_calls, session_id, analysis_session_id, request_id, callback_url
))
#3.立即返回结果
return {
"success": True,
"status": "scheduled", # 状态:已安排
"task_id": request_id,
"message": f"已调度 {len(agent_calls)} 个Agent任务",
"accepted_at": _now_iso(),
"session_id": session_id,
"analysis_session_id": analysis_session_id
}
except Exception as e:
logger.error(f"[统一调度] Agent任务调度失败: {e}")
raise HTTPException(500, f"调度失败: {e}")
@app.post("/analyze_and_execute")
async def analyze_and_execute(payload: Dict[str, Any]):
"""意图分析和电脑控制任务执行 - 保持向后兼容
为什么需要这个?
为了兼容旧版本的程序,新版本用/schedule,旧版本还用这个
"""
if not Modules.analyzer or not Modules.computer_control:
raise HTTPException(503, "分析器或电脑控制智能体未就绪")
messages = (payload or {}).get("messages", [])
if not isinstance(messages, list):
raise HTTPException(400, "messages必须是{role, content}格式的列表")
session_id = (payload or {}).get("session_id")
try:
# 直接执行电脑控制任务,不进行意图分析
# 意图分析已在API服务器中完成,这里只负责执行具体的Agent任务
# 从消息中提取任务指令
tasks = []
for msg in messages:
if msg.get("role") == "user": # 如果是用户消息
content = msg.get("content", "")
if "执行Agent任务:" in content: # 如果包含特定关键词
# 提取任务指令
instruction = content.replace("执行Agent任务:", "").strip() # 提取指令
tasks.append({
"instruction": instruction
})
'''
示例消息格式:
{
"messages": [
{"role": "user", "content": "执行Agent任务: 打开记事本"},
{"role": "user", "content": "执行Agent任务: 输入Hello World"}
]
}
'''
'''与/schedule的区别:
/schedule:异步执行,立即返回
/analyze_and_execute:同步执行,等所有任务完成才返回
就像快餐和堂食的区别'''
if not tasks:
return {
"success": True,
"status": "no_tasks",
"message": "未发现可执行的电脑控制任务",
"accepted_at": _now_iso(),
"session_id": session_id
}
logger.info(f"会话 {session_id} 发现 {len(tasks)} 个电脑控制任务")
# 处理每个任务
results = []
for task_instruction in tasks:
result = await _process_computer_control_task(task_instruction, session_id)
results.append(result)
return {
"success": True,
"status": "completed",
"tasks_processed": len(tasks),
"results": results,
"accepted_at": _now_iso(),
"session_id": session_id
}
except Exception as e:
logger.error(f"意图分析和任务执行失败: {e}")
raise HTTPException(500, f"处理失败: {e}")
@app.get("/computer_control/availability")
async def get_computer_control_availability():
"""获取电脑控制可用性(检查工具)"""
try:
if not Modules.computer_control: # 如果电脑控制器不存在
return {"ready": False, "reasons": ["电脑控制智能体未初始化"]}
# 检查电脑控制能力
capabilities = Modules.computer_control.get_capabilities()
return {
"ready": capabilities.get("enabled", False), # 是否可用
"capabilities": capabilities, # 具体能力
"timestamp": _now_iso()
}
'''
{
"ready": true,
"capabilities": {
"enabled": true,
"tools": ["鼠标控制", "键盘输入", "截图"],
"platform": "Windows"
},
"timestamp": "2024-01-01T10:00:00"
}
'''
except Exception as e:
logger.error(f"检查电脑控制可用性失败: {e}")
return {"ready": False, "reasons": [f"检查失败: {e}"]}
@app.post("/computer_control/execute")
async def execute_computer_control_task(payload: Dict[str, Any]):
"""直接执行电脑控制任务(单点执行)"""
if not Modules.computer_control:
raise HTTPException(503, "电脑控制智能体未就绪")
instruction = payload.get("instruction", "")
if not instruction: # 如果指令为空
raise HTTPException(400, "instruction不能为空")
try:
result = await _process_computer_control_task(instruction)
return {
"success": result.get("success", False),
"result": result.get("result"),
"error": result.get("error"),
"instruction": instruction
}
except Exception as e:
logger.error(f"执行电脑控制任务失败: {e}")
raise HTTPException(500, f"执行失败: {e}")
'''
使用场景:
只执行一个简单任务
需要立即知道结果
不需要复杂调度
示例请求:
{
"instruction": "打开浏览器"
}
示例响应:
{
"success": true,
"result": "已打开Chrome浏览器",
"error": null,
"instruction": "打开浏览器"
}
总结所有API端点
端点 | 方法 | 用途 | 特点
/health | GET | 健康检查 | 快速检查服务状态
/schedule | POST | 统一任务调度 | 主要入口,异步执行,支持多个任务
/analyze_and_execute | POST | 意图分析和执行 | 兼容旧版本,同步执行
/computer_control/availability | GET | 获取可用性 | 检查电脑控制能力
/computer_control/execute | POST | 直接执行任务 | 单任务,同步执行
'''
# ============ 任务记忆管理API ============
@app.get("/tasks")
async def get_tasks(session_id: Optional[str] = None):
"""获取任务列表(看看现在在忙什么)"""
# 1. 检查工具是否准备好
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try:
# 2. 从调度器获取正在运行的任务
running_tasks = await Modules.task_scheduler.get_running_tasks()
return {
"success": True,
"running_tasks": running_tasks, # 正在运行的任务
"session_id": session_id
}
except Exception as e:
logger.error(f"获取任务列表失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
'''使用示例:
GET http://localhost:8000/tasks
可能返回:
{
"success": true,
"running_tasks": [
{
"task_id": "task001",
"purpose": "打开记事本并记录",
"status": "running",
"started_at": "2024-01-01T10:00:00"
},
{
"task_id": "task002",
"purpose": "保存文件",
"status": "completed",
"started_at": "2024-01-01T09:30:00"
}
]
}
'''
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""获取指定任务状态(查看具体任务)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try: # 获取任务状态
task_status = await Modules.task_scheduler.get_task_status(task_id)
# 如果没找到,返回404错误
if not task_status:
raise HTTPException(404, f"任务 {task_id} 不存在")
return {
"success": True,
"task": task_status
}
except HTTPException: # 如果是我们主动抛出的错误,直接传递
raise
except Exception as e: # 其他未知错误
logger.error(f"获取任务状态失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
'''
路径参数:{task_id}
从URL中提取任务ID
例子:/tasks/abc123 → task_id = "abc123"
错误处理两种方式:
1.HTTPException:主动抛出的已知错误(如404)
2.Exception:未知的意外错误
'''
@app.get("/tasks/{task_id}/memory")
async def get_task_memory(task_id: str, include_key_facts: bool = True):
"""获取任务记忆摘要(看看任务经历了什么)"""
# 查询参数:include_key_facts,默认为true
# 例子:/tasks/abc123/memory?include_key_facts=false
'''
查询参数 vs 路径参数:
# 路径参数(必须的)
@app.get("/tasks/{task_id}/memory")
# 查询参数(可选的)
async def get_task_memory(task_id: str, include_key_facts: bool = True):
使用示例:GET http://localhost:8000/tasks/abc123/memory?include_key_facts=true
'''
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try:
memory_summary = await Modules.task_scheduler.get_task_memory_summary(task_id, include_key_facts)
return {
"success": True,
"task_id": task_id,
"memory_summary": memory_summary
}
except Exception as e:
logger.error(f"获取任务记忆失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
@app.get("/memory/global")
async def get_global_memory():
"""获取全局记忆摘要(看看整个系统的记忆)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try:
# 获取全局摘要
global_summary = await Modules.task_scheduler.get_global_memory_summary()
# 获取失败尝试的摘要
failed_attempts = await Modules.task_scheduler.get_failed_attempts_summary()
'''为什么需要全局记忆?
看看整个系统执行了多少任务
哪些任务经常失败
总结经验,改进未来执行
'''
return {
"success": True,
"global_summary": global_summary,
"failed_attempts": failed_attempts
}
except Exception as e:
logger.error(f"获取全局记忆失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
@app.post("/tasks/{task_id}/steps")
async def add_task_step(task_id: str, payload: Dict[str, Any]):
"""添加任务步骤(手动添加步骤记录)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try: # 创建一个TaskStep对象
step = TaskStep(
step_id=payload.get("step_id", str(uuid.uuid4())), # 步骤ID,没有就生成
task_id=task_id, # 属于哪个任务
purpose=payload.get("purpose", "执行步骤"), # 步骤目的
content=payload.get("content", ""), # 步骤内容
output=payload.get("output", ""), # 步骤输出
analysis=payload.get("analysis"), # 分析结果
success=payload.get("success", True), # 是否成功
error=payload.get("error") # 错误信息
)
# 添加到调度器
await Modules.task_scheduler.add_task_step(task_id, step)
return {
"success": True,
"message": "步骤添加成功",
"step_id": step.step_id
}
except Exception as e:
logger.error(f"添加任务步骤失败: {e}")
raise HTTPException(500, f"添加失败: {e}")
#DELETE方法:用于删除资源,这里是删除任务的记忆
@app.delete("/tasks/{task_id}/memory")
async def clear_task_memory(task_id: str):
"""清除任务记忆(删除单个任务的记忆)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try:# 调用调度器清除记忆
success = await Modules.task_scheduler.clear_task_memory(task_id)
if not success: # 调用调度器清除记忆
raise HTTPException(404, f"任务 {task_id} 不存在")
return {
"success": True,
"message": f"任务 {task_id} 的记忆已清除"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"清除任务记忆失败: {e}")
raise HTTPException(500, f"清除失败: {e}")
@app.delete("/memory/global")
async def clear_global_memory():
"""清除全局记忆(清空所有记忆)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try:
await Modules.task_scheduler.clear_all_memory()
return {
"success": True,
"message": "全局记忆已清除"
}
except Exception as e:
logger.error(f"清除全局记忆失败: {e}")
raise HTTPException(500, f"清除失败: {e}")
#危险操作:会删除所有任务的记忆,就像格式化硬盘一样。
# ============ 会话级别的记忆管理API ============
@app.get("/sessions")
async def get_all_sessions():
"""获取所有会话的摘要信息(看看有哪些对话)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try: # 获取所有会话
sessions = await Modules.task_scheduler.get_all_sessions()
return {
"success": True,
"sessions": sessions, # 会话列表
"total_sessions": len(sessions) # 会话总数
}
except Exception as e:
logger.error(f"获取会话列表失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
@app.get("/sessions/{session_id}/memory")
async def get_session_memory_summary(session_id: str):
"""获取会话记忆摘要(查看一次对话的整体情况)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try: #(查看一次对话的整体情况)
summary = await Modules.task_scheduler.get_session_memory_summary(session_id)
# 如果摘要中包含error,说明会话不存在
if "error" in summary:
raise HTTPException(404, summary["error"])
return {
"success": True,
"session_id": session_id,
"memory_summary": summary
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取会话记忆摘要失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
@app.get("/sessions/{session_id}/compressed_memories")
async def get_session_compressed_memories(session_id: str):
"""获取会话的压缩记忆(精简版记忆)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try: # 获取压缩后的记忆
memories = await Modules.task_scheduler.get_session_compressed_memories(session_id)
return {
"success": True,
"session_id": session_id,
"compressed_memories": memories, # 压缩记忆列表
"count": len(memories) # 有多少条压缩记忆
}
except Exception as e:
logger.error(f"获取会话压缩记忆失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
'''什么是压缩记忆?
把很多细节压缩成关键信息,例子:把"打开记事本→输入文字→保存文件"压缩成"创建了一个文本文件"
'''
@app.get("/sessions/{session_id}/key_facts")
async def get_session_key_facts(session_id: str):
"""获取会话的关键事实(最重要的信息)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try: # 获取关键事实
key_facts = await Modules.task_scheduler.get_session_key_facts(session_id)
return {
"success": True,
"session_id": session_id,
"key_facts": key_facts, # 关键事实列表
"count": len(key_facts) # 有多少个关键事实
}
except Exception as e:
logger.error(f"获取会话关键事实失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
'''
关键事实示例:
{
"key_facts": [
"用户需要创建一个会议记录文档",
"用户偏好使用记事本而不是Word",
"用户经常在下午3点开会"
]
}
'''
@app.get("/sessions/{session_id}/failed_attempts")
async def get_session_failed_attempts(session_id: str):
"""获取会话的失败尝试(看看哪里出错了)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try: # 获取失败尝试记录
failed_attempts = await Modules.task_scheduler.get_session_failed_attempts(session_id)
return {
"success": True,
"session_id": session_id,
"failed_attempts": failed_attempts, # 失败记录
"count": len(failed_attempts) # 失败次数
}
except Exception as e:
logger.error(f"获取会话失败尝试失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
#学习价值:从失败中学习,避免重复错误
@app.get("/sessions/{session_id}/tasks")
async def get_session_tasks(session_id: str):
"""获取会话的所有任务(看看这次对话做了哪些事)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try: # 获取会话的所有任务
tasks = await Modules.task_scheduler.get_session_tasks(session_id)
return {
"success": True,
"session_id": session_id,
"tasks": tasks, # 任务列表
"count": len(tasks) # 任务数量
}
except Exception as e:
logger.error(f"获取会话任务失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
@app.delete("/sessions/{session_id}/memory")
async def clear_session_memory(session_id: str):
"""清除指定会话的记忆(删除一次对话的所有记忆)"""
if not Modules.task_scheduler:
raise HTTPException(503, "任务调度器未就绪")
try: # 清除会话记忆
success = await Modules.task_scheduler.clear_session_memory(session_id)
if not success: # 如果返回false,说明会话不存在
raise HTTPException(404, f"会话 {session_id} 不存在")
return {
"success": True,
"message": f"会话 {session_id} 的记忆已清除"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"清除会话记忆失败: {e}")
raise HTTPException(500, f"清除失败: {e}")
# ============ 文件编辑工具包API ============
@app.get("/tools")
async def list_tools():
"""列出所有可用的工具 (看看有什么工具)"""
try: # 从工具管理器中获取所有工具
tools = toolkit_manager.get_all_tools()
return {
"success": True,
"tools": tools, # 工具列表
"count": len(tools) # 工具总数
}
except Exception as e:
logger.error(f"获取工具列表失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
'''
什么是工具?
工具 = 一个具体的功能,比如:"打开浏览器"、"复制文件"、"发送邮件"
示例返回:
{
"success": true,
"tools": [
{"name": "open_browser", "description": "打开浏览器"},
{"name": "copy_file", "description": "复制文件"},
{"name": "send_email", "description": "发送邮件"}
],
"count": 3
}
'''
@app.get("/toolkits")
async def list_toolkits():
"""列出所有可用的工具包(看看有哪些工具箱)"""
try: # 获取所有工具包
toolkits = toolkit_manager.list_toolkits()
return {
"success": True,
"toolkits": toolkits,
"count": len(toolkits)
}
except Exception as e:
logger.error(f"获取工具包列表失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
'''
工具包 vs 工具:
工具包:一组相关工具的集合(像工具箱),工具:单个具体的功能(像扳手、锤子)
示例:
工具包:file_edit(文件编辑工具包)
├── 工具1:read_file(读文件)
├── 工具2:write_file(写文件)
└── 工具3:edit_file(编辑文件)
'''
@app.get("/toolkits/{toolkit_name}")
async def get_toolkit_info(toolkit_name: str):
"""获取工具包详细信息(打开工具箱看看里面有什么)"""
try: # 获取指定工具包的信息
info = toolkit_manager.get_toolkit_info(toolkit_name)
if not info: # 如果没找到,返回404
raise HTTPException(404, f"工具包不存在: {toolkit_name}")
return {
"success": True,
"toolkit": info
}
except HTTPException: # 已知错误(如404)
raise
except Exception as e: # 未知错误
logger.error(f"获取工具包信息失败: {e}")
raise HTTPException(500, f"获取失败: {e}")
'''
路径参数:{toolkit_name}
从URL中提取工具包名称,例子:/toolkits/file_edit → toolkit_name = "file_edit"
'''
@app.post("/tools/{toolkit_name}/{tool_name}")
async def call_tool(toolkit_name: str, tool_name: str, arguments: Dict[str, Any]):
"""调用工具(使用具体工具干活)"""
try: # 调用工具:指定工具包、工具名和参数
result = await toolkit_manager.call_tool(toolkit_name, tool_name, arguments)
return {
"success": True,
"toolkit": toolkit_name,
"tool": tool_name,
"result": result
}
except Exception as e:
logger.error(f"调用工具失败 {toolkit_name}.{tool_name}: {e}")
raise HTTPException(500, f"调用失败: {e}")
'''
URL结构分析:
POST /tools/{工具箱}/{工具名}
例子:POST /tools/file_edit/read_file
参数传递:
{
"path": "/home/user/test.txt"
}
完整调用示例:
POST http://localhost:8000/tools/file_edit/read_file
Content-Type: application/json
{
"path": "/home/user/test.txt"
}
'''
# ============ 文件编辑专用API ============
@app.post("/file/edit")
async def edit_file(request: Dict[str, str]):
"""编辑文件 - 使用SEARCH/REPLACE格式"""
try:
path = request.get("path") # 文件路径
diff = request.get("diff") # 差异内容(要修改的部分)
# 检查必要参数
if not path or not diff:
raise HTTPException(400, "缺少必要参数: path 和 diff")
# 调用文件编辑工具
result = await toolkit_manager.call_tool("file_edit", "edit_file", {
"path": path,
"diff": diff
})
return {
"success": True,
"result": result
}
except HTTPException:
raise
except Exception as e:
logger.error(f"编辑文件失败: {e}")
raise HTTPException(500, f"编辑失败: {e}")
'''
什么是SEARCH/REPLACE格式?
这是一种表示文件修改的方式:
{
"path": "test.py",
"diff": "<<<<<<< SEARCH\nprint('Hello')\n=======\nprint('Hello World')\n>>>>>>> REPLACE"
}
解释:
SEARCH部分:找到文件中匹配的内容
REPLACE部分:替换成新内容
就像在Word里用查找替换功能
'''
@app.post("/file/write")
async def write_file(request: Dict[str, str]):
"""写入文件(创建或覆盖文件)"""
try:
path = request.get("path") # 文件路径
content = request.get("content") # 文件内容
# 检查参数,注意content可以是空字符串,所以用content is None判断
if not path or content is None:
raise HTTPException(400, "缺少必要参数: path 和 content")
# 调用写文件工具
result = await toolkit_manager.call_tool("file_edit", "write_file", {
"path": path,
"file_text": content # 注意参数名是file_text
})
return {
"success": True,
"result": result
}
except HTTPException:
raise
except Exception as e:
logger.error(f"写入文件失败: {e}")
raise HTTPException(500, f"写入失败: {e}")
'''
注意细节:
if not path or content is None:
# 这里用content is None而不是not content
# 因为content可以是空字符串"",这也是有效的
使用场景:1.创建新文件
2.覆盖已有文件
3.清空文件内容(传入空字符串)
'''
@app.get("/file/read")
async def read_file(path: str):
"""读取文件"""
try: # 调用读文件工具
result = await toolkit_manager.call_tool("file_edit", "read_file", {
"path": path
})
return {
"success": True,
"content": result # 文件内容
}
except Exception as e:
logger.error(f"读取文件失败: {e}")
raise HTTPException(500, f"读取失败: {e}")
'''
查询参数:
path:通过URL参数传递
例子:/file/read?path=/home/user/test.txt
返回示例:
{
"success": true,
"content": "Hello World!\nThis is a test file."
}
'''
@app.get("/file/list")
async def list_files(directory: str = "."):
"""列出目录文件(查看文件夹内容)"""
try:
result = await toolkit_manager.call_tool("file_edit", "list_files", {
"directory": directory # 目录路径,默认为当前目录"."
})
'''
默认值:
directory: str = "."
# 如果不传directory参数,默认是当前目录"."
使用示例:
GET /file/list?directory=/home/user
返回示例:
{
"success": true,
"result": {
"files": ["test.txt", "photo.jpg"],
"directories": ["Documents", "Downloads"],
"total": 4
}
}
'''
return {
"success": True,
"result": result
}
except Exception as e:
logger.error(f"列出文件失败: {e}")
raise HTTPException(500, f"列出失败: {e}")
#程序启动,启动FastAPI应用
if __name__ == "__main__":
'''
if __name__ == "__main__":
这是一个魔法开关
只有当直接运行这个文件时,下面的代码才会执行
如果被别人导入,就不会执行
'''
import uvicorn
#Uvicorn:一个快速的Web服务器
#专门用来运行FastAPI应用
from agentserver.config import AGENT_SERVER_PORT
#从配置文件导入端口号
#这样可以在配置文件中修改端口,不用改代码
uvicorn.run(app, host="0.0.0.0", port=AGENT_SERVER_PORT)
#启动Web服务器
#app:我们创建的FastAPI应用
#host="0.0.0.0":监听所有网络接口(可以从任何IP访问)
#port:使用配置文件中定义的端口
'''
host参数解释:
值 | 含义 | 谁可以访问
"127.0.0.1" | 只监听本机 | 只有本机能访问
"0.0.0.0" | 监听所有IP | 其他电脑也能访问
"192.168.1.100" | 监听指定IP | 只有这个IP能访问
文件操作API汇总:
端点 | 方法 | 用途 | 参数
/file/edit | POST | 编辑文件 | path, diff
/file/write | POST | 写入/创建文件 | path, content
/file/read | GET | 读取文件 | path(查询参数)
/file/list | GET | 列出目录文件 | directory(查询参数)
与通用工具调用的区别:
方式一:使用通用工具调用
POST /tools/file_edit/read_file
{
"path": "/home/user/test.txt"
}
方式二:使用专用文件API
GET /file/read?path=/home/user/test.txt
区别:
专用API:更方便,参数直接在URL或固定格式中
通用工具调用:更灵活,可以调用任何工具包的任何工具
'''
# =========完整的API总览==========
#现在我们已经分析完所有API,让我们总结一下:
'''
1.系统状态类
/health 健康检查
/computer_control/availability 检查电脑控制能力
2.任务执行类
/schedule 统一任务调度(主要入口)
/analyze_and_execute 意图分析和执行(兼容旧版)
/computer_control/execute 直接执行电脑控制任务
3.记忆管理类
/tasks 获取任务列表
/tasks/{task_id} 获取任务状态
/tasks/{task_id}/memory 获取任务记忆
/tasks/{task_id}/steps 添加任务步骤
/memory/global 获取全局记忆
/sessions 获取所有会话
/sessions/{session_id}/memory 获取会话记忆
4. 工具管理类
/tools 列出所有工具
/toolkits 列出所有工具包
/toolkits/{toolkit_name} 获取工具包详情
/tools/{toolkit}/{tool} 调用工具
5. 文件操作类
/file/edit 编辑文件
/file/write 写入文件
/file/read 读取文件
/file/list 列出目录文件
'''
#一、文件架构
'''
naga_agent.py(主文件)
├── 导入部分(15个导入)
├── 配置部分(日志配置)
├── 核心类(Modules)
├── 生命周期管理器(lifespan)
├── FastAPI应用实例(app)
├── 7个核心功能函数
├── 25个API端点函数
└── 启动代码
'''
#二、核心类结构
#1. Modules类(全局工具箱)
'''
class Modules:
analyzer = None # 意图分析器
computer_control = None # 电脑控制器
task_scheduler = None # 任务调度器
'''
#三、核心函数(7个)
#1. 基础工具函数
#def _now_iso() -> str:
# 获取当前时间(ISO格式)
#2. 任务执行函数(3个)
'''
async def _process_computer_control_task(...):
# 执行单个电脑控制任务
async def _execute_agent_tasks_async(...):
# 异步执行多个Agent任务
async def _send_callback_notification(...):
# 发送回调通知
'''
#3. 生命周期函数(1个)
'''
@asynccontextmanager
async def lifespan(app: FastAPI):
# 程序启动和关闭管理
'''
#四、API端点(25个,按功能分组)
#第1组:系统状态(2个)
'''
GET /health # 健康检查
GET /computer_control/availability # 检查电脑控制能力
'''
#第2组:任务执行(3个)
'''
POST /schedule # 统一任务调度(主入口)
POST /analyze_and_execute # 意图分析和执行(兼容旧版)
POST /computer_control/execute # 直接执行电脑任务
'''
#第3组:任务记忆管理(6个)
'''
GET /tasks # 获取所有任务
GET /tasks/{task_id} # 获取特定任务状态
GET /tasks/{task_id}/memory # 获取任务记忆
POST /tasks/{task_id}/steps # 添加任务步骤
DELETE /tasks/{task_id}/memory # 清除任务记忆
DELETE /memory/global # 清除全局记忆
'''
#第4组:会话记忆管理(8个)
'''
GET /sessions # 获取所有会话
GET /sessions/{session_id}/memory # 获取会话记忆
GET /sessions/{session_id}/compressed_memories # 压缩记忆
GET /sessions/{session_id}/key_facts # 关键事实
GET /sessions/{session_id}/failed_attempts # 失败尝试
GET /sessions/{session_id}/tasks # 会话的所有任务
DELETE /sessions/{session_id}/memory # 清除会话记忆
'''
#第5组:工具管理(4个)
'''
GET /tools # 列出所有工具
GET /toolkits # 列出所有工具包
GET /toolkits/{toolkit_name} # 获取工具包详情
POST /tools/{toolkit}/{tool} # 调用工具
'''
#第6组:文件操作(4个)
'''
POST /file/edit # 编辑文件
POST /file/write # 写入文件
GET /file/read # 读取文件
GET /file/list # 列出目录
'''
#五、数据结构层级
#1. 最高层:程序
'''
NagaAgent服务
↓
会话(Session)
↓
任务(Task)
↓
步骤(Step)
'''
#2. 会话(Session)结构
'''
会话 = {
"session_id": "用户123", # 会话ID
"tasks": [任务1, 任务2, ...], # 包含的任务
"memory": 记忆数据, # 会话记忆
"key_facts": 关键事实列表 # 重要信息
}
'''
#3. 任务(Task)结构
'''
任务 = {
"task_id": "任务001", # 任务ID
"purpose": "打开记事本", # 任务目的
"status": "running", # 任务状态
"steps": [步骤1, 步骤2, ...], # 任务步骤
"memory": 任务记忆 # 任务记忆
}
'''
#4. 步骤(Step)结构
'''
步骤 = {
"step_id": "步骤001", # 步骤ID
"purpose": "打开记事本", # 步骤目的
"content": "调用记事本程序", # 步骤内容
"output": "记事本已打开", # 步骤输出
"success": True, # 是否成功
"error": None # 错误信息
}
'''
#六、执行流程
#完整任务执行流程:
#用户请求 → /schedule端点 → 创建任务 → 异步执行 → 处理每个Agent任务 → 调用电脑控制器 → 记录步骤 → 发送回调 → 更新记忆
#1. 程序元信息
'''
程序元信息
├── 解释器声明:#!/usr/bin/env python3
└── 编码声明:# -*- coding: utf-8 -*-
'''
#2. 导入模块系统
'''
导入模块系统
├── Python标准库(5个)
│ ├── asyncio # 异步处理
│ ├── uuid # 生成唯一ID
│ ├── logging # 日志记录
│ ├── typing # 类型注解
│ └── datetime # 时间处理
│
├── Web框架(2个)
│ ├── fastapi.FastAPI # Web应用框架
│ └── fastapi.HTTPException # HTTP异常
│
├── 辅助工具(2个)
│ ├── fastapi.responses.JSONResponse # JSON响应
│ └── contextlib.asynccontextmanager # 异步上下文管理器
│
└── 自定义模块(6个)
├── system.config # 配置文件
├── system.background_analyzer # 意图分析器
├── agentserver.agent_computer_control # 电脑控制智能体
├── agentserver.task_scheduler # 任务调度器
├── agentserver.toolkit_manager # 工具管理器
└── agentserver.config # 服务器配置
'''
#3. 日志配置系统
'''
日志配置系统
├── 创建日志器:logging.getLogger(__name__)
├── 设置日志级别:logger.setLevel(logging.INFO)
└── 配置处理器(如果没有的话)
├── 创建控制台处理器:logging.StreamHandler()
├── 设置日志格式:时间-名称-级别-消息
└── 添加处理器:logger.addHandler(handler)
'''
#4. 生命周期管理器
'''
生命周期管理器(lifespan函数)
├── 启动阶段(startup)
│ ├── 初始化意图分析器:get_background_analyzer()
│ ├── 初始化电脑控制智能体:ComputerControlAgent()
│ ├── 初始化任务调度器:get_task_scheduler()
│ └── 设置LLM配置(如果存在)
│
├── 运行阶段(yield)
└── 关闭阶段(shutdown)
└── 记录关闭日志
'''
#5. 全局模块管理器
'''
全局模块管理器(Modules类)
├── 意图分析器:analyzer = None
├── 电脑控制器:computer_control = None
└── 任务调度器:task_scheduler = None
'''
# 6.内部工具函数
'''
内部工具函数
└── 时间工具
└── def _now_iso() -> str
├── 功能:获取当前ISO格式时间
└── 返回:YYYY-MM-DDTHH:MM:SS格式字符串
'''
#7. 核心处理函数
'''
核心处理函数
├── 单个任务处理器
│ └── async def _process_computer_control_task(...)
│ ├── 功能:处理单个电脑控制任务
│ ├── 输入:指令和会话ID
│ ├── 流程:
│ │ ├── 记录开始日志
│ │ ├── 调用电脑控制器执行
│ │ ├── 记录完成日志
│ │ └── 返回结果
│ └── 输出:成功/失败结果
│
├── 批量任务执行器
│ └── async def _execute_agent_tasks_async(...)
│ ├── 功能:异步执行多个Agent任务
│ ├── 输入:任务列表、会话信息等
│ ├── 流程:
│ │ ├── 循环处理每个任务
│ │ ├── 为每个任务添加步骤记录
│ │ ├── 执行电脑控制任务
│ │ ├── 更新步骤结果
│ │ └── 发送回调通知
│ └── 输出:无(异步执行)
│
└── 回调通知器
└── async def _send_callback_notification(...)
├── 功能:发送任务完成回调通知
├── 输入:回调URL、任务结果等
├── 流程:
│ ├── 构建回调数据
│ ├── 发送HTTP POST请求
│ └── 处理响应
└── 输出:无
'''
#8. API端点系统(25个端点)
#8.1 系统状态检查
'''
系统状态检查
├── GET /health
│ ├── 功能:健康检查
│ └── 返回:服务状态和各模块就绪情况
│
└── GET /computer_control/availability
├── 功能:获取电脑控制可用性
└── 返回:电脑控制能力和状态
'''
#8.2 任务执行端点
'''
任务执行端点
├── POST /schedule
│ ├── 功能:统一任务调度(主入口)
│ ├── 输入:查询、任务列表、会话信息等
│ ├── 流程:
│ │ ├── 检查调度器就绪
│ │ ├── 创建任务记录
│ │ ├── 异步执行任务
│ │ └── 立即返回接受响应
│ └── 返回:任务ID和调度状态
│
├── POST /analyze_and_execute
│ ├── 功能:意图分析和执行(兼容旧版)
│ ├── 输入:消息列表和会话ID
│ ├── 流程:
│ │ ├── 从消息中提取任务指令
│ │ ├── 同步执行每个任务
│ │ └── 等待所有任务完成
│ └── 返回:执行结果列表
│
└── POST /computer_control/execute
├── 功能:直接执行电脑控制任务
├── 输入:指令字符串
└── 返回:执行结果
'''
#8.3 任务记忆管理
'''
任务记忆管理
├── 任务级别(Task Level)
│ ├── GET /tasks
│ │ ├── 功能:获取所有任务列表
│ │ └── 返回:正在运行的任务列表
│ │
│ ├── GET /tasks/{task_id}
│ │ ├── 功能:获取指定任务状态
│ │ └── 返回:任务详细状态
│ │
│ ├── GET /tasks/{task_id}/memory
│ │ ├── 功能:获取任务记忆摘要
│ │ └── 返回:任务记忆信息
│ │
│ ├── POST /tasks/{task_id}/steps
│ │ ├── 功能:添加任务步骤
│ │ └── 返回:步骤添加结果
│ │
│ └── DELETE /tasks/{task_id}/memory
│ ├── 功能:清除任务记忆
│ └── 返回:清除结果
│
├── 会话级别(Session Level)
│ ├── GET /sessions
│ │ ├── 功能:获取所有会话摘要
│ │ └── 返回:会话列表和总数
│ │
│ ├── GET /sessions/{session_id}/memory
│ │ ├── 功能:获取会话记忆摘要
│ │ └── 返回:会话记忆信息
│ │
│ ├── GET /sessions/{session_id}/compressed_memories
│ │ ├── 功能:获取会话压缩记忆
│ │ └── 返回:压缩记忆列表
│ │
│ ├── GET /sessions/{session_id}/key_facts
│ │ ├── 功能:获取会话关键事实
│ │ └── 返回:关键事实列表
│ │
│ ├── GET /sessions/{session_id}/failed_attempts
│ │ ├── 功能:获取会话失败尝试
│ │ └── 返回:失败记录列表
│ │
│ ├── GET /sessions/{session_id}/tasks
│ │ ├── 功能:获取会话的所有任务
│ │ └── 返回:任务列表
│ │
│ └── DELETE /sessions/{session_id}/memory
│ ├── 功能:清除指定会话记忆
│ └── 返回:清除结果
│
└── 全局级别(Global Level)
├── GET /memory/global
│ ├── 功能:获取全局记忆摘要
│ └── 返回:全局记忆和失败尝试摘要
│
└── DELETE /memory/global
├── 功能:清除全局记忆
└── 返回:清除结果
'''
#8.4 工具管理系统
'''
工具管理系统
├── GET /tools
│ ├── 功能:列出所有可用工具
│ └── 返回:工具列表和总数
│
├── GET /toolkits
│ ├── 功能:列出所有可用工具包
│ └── 返回:工具包列表和总数
│
├── GET /toolkits/{toolkit_name}
│ ├── 功能:获取工具包详细信息
│ └── 返回:工具包详情
│
└── POST /tools/{toolkit_name}/{tool_name}
├── 功能:调用指定工具
└── 返回:工具执行结果
'''
#8.5 文件操作系统
'''
文件操作系统
├── POST /file/edit
│ ├── 功能:编辑文件(SEARCH/REPLACE格式)
│ └── 返回:编辑结果
│
├── POST /file/write
│ ├── 功能:写入文件
│ └── 返回:写入结果
│
├── GET /file/read
│ ├── 功能:读取文件
│ └── 返回:文件内容
│
└── GET /file/list
├── 功能:列出目录文件
└── 返回:文件和目录列表
'''
#9. 程序启动入口
'''
程序启动入口
├── 条件判断:if __name__ == "__main__"
├── 导入uvicorn服务器
├── 导入端口配置
└── 启动Web服务器
├── 服务器:uvicorn
├── 应用:app
├── 主机:0.0.0.0(所有网络接口)
└── 端口:从配置读取
'''
#三、数据流层级结构
#请求处理流程
'''
外部请求
↓
FastAPI路由
↓
API端点函数
├── 参数验证
├── 权限检查
├── 调用内部函数
└── 返回响应
↓
内部处理函数
├── 调用全局模块
├── 执行具体操作
└── 记录日志和记忆
↓
全局模块
├── 意图分析器(理解用户意图)
├── 电脑控制器(执行电脑操作)
└── 任务调度器(管理任务流程)
↓
底层系统
├── 文件系统(读写文件)
├── 操作系统(控制电脑)
└── 网络系统(发送回调)
'''
#记忆系统层级
'''
记忆系统层级
├── 步骤级别(Step Level)
│ ├── 单个操作记录
│ ├── 执行结果
│ └── 错误信息
│
├── 任务级别(Task Level)
│ ├── 任务目的
│ ├── 所有步骤记录
│ ├── 任务状态
│ └── 任务结果
│
├── 会话级别(Session Level)
│ ├── 用户会话上下文
│ ├── 所有任务记录
│ ├── 压缩记忆(精简版)
│ ├── 关键事实(重要信息)
│ └── 失败尝试记录
│
└── 全局级别(Global Level)
├── 所有会话摘要
├── 系统总体统计
└── 全局失败记录
'''
#四、错误处理层级
'''
错误处理结构
├── HTTP异常(HTTPException)
│ ├── 400 Bad Request(客户端错误)
│ ├── 404 Not Found(资源不存在)
│ ├── 503 Service Unavailable(服务不可用)
│ └── 500 Internal Server Error(服务器内部错误)
│
├── 通用异常处理
│ ├── try-except块
│ ├── 记录错误日志
│ └── 返回统一错误格式
│
└── 特定异常处理
├── 已知错误:直接抛出HTTPException
└── 未知错误:捕获并转换为500错误
'''
#五、配置层级结构
'''
配置系统
├── 日志配置
│ ├── 日志级别:INFO
│ ├── 输出格式
│ └── 输出目标
│
├── 服务器配置
│ └── 端口号:从agentserver.config导入
│
├── AI模型配置
│ ├── 模型名称
│ ├── API密钥
│ └── 基础URL
│
└── 模块配置
├── 意图分析器配置
├── 电脑控制器配置
└── 任务调度器配置
'''
agent_manager.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Agent管理器 - 独立的Agent注册和调用系统
支持从配置文件动态加载Agent定义,提供统一的调用接口
第1行:告诉电脑"这个文件要用python3来运行"
第2行:告诉电脑"这个文件里的文字使用UTF-8编码"(这样才能正确显示中文)
"""
import os # 用来操作文件和文件夹
import json # 用来处理JSON格式的数据
import asyncio # 用来处理"异步"操作(可以同时做多件事)
import logging # 用来记录程序运行时的信息
import time # 用来处理时间相关的操作
from pathlib import Path # 更好的处理文件路径
from typing import Dict, Any, Optional, List # 给代码添加类型提示,让代码更清晰
from dataclasses import dataclass, field # 创建数据类,简化代码
from datetime import datetime, timedelta # 处理日期和时间
import re # 处理文本匹配(正则表达式)
#这些就像工具箱里的不同工具,每个工具都有专门的用途。
# 配置日志
logging.basicConfig(level=logging.INFO) # 设置记录"INFO"级别及以上的信息
logger = logging.getLogger("AgentManager") # 创建一个叫"AgentManager"的日志记录器
# 屏蔽HTTP库的DEBUG日志
logging.getLogger("httpcore.http11").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore.connection").setLevel(logging.WARNING)
'''
这是什么意思?
日志就像是程序的"日记本"
这里设置只记录重要程度为"INFO"及以上的信息
后面的三行是告诉程序:"不要记录HTTP网络请求的详细信息"
'''
'''
@dataclass是一个"装饰器",它告诉Python:"这个类是用来存储数据的"
加了@dataclass后,这个类会自动生成一些有用的方法
让你不需要写很多重复的代码
dataclass 自动生成的功能:
自动生成__init__()方法(初始化方法)
自动生成__repr__()方法(漂亮的打印格式)
自动生成比较方法(比如==比较)
'''
@dataclass
class AgentConfig:
"""Agent配置类"""
id: str # 模型ID
name: str # Agent名称(中文名)
base_name: str # 基础名称(英文)
system_prompt: str # 系统提示词
max_output_tokens: int = 40000 # 最大输出token数
temperature: float = 0.7 # 温度参数
description: str = "" # 描述信息
model_provider: str = "openai" # 模型提供商
api_base_url: str = "" # API基础URL
api_key: str = "" # API密钥
@dataclass
class AgentSession:
"""Agent会话类"""
timestamp: float = field(default_factory=time.time)
#timestamp: float 表示这个字段是浮点数(带小数点的数字),float是Python中的浮点数类型
'''默认值设置:
= field(default_factory=time.time) 这是特殊的设置
field()是dataclasses模块的函数
default_factory=time.time意思是:"如果不指定值,就调用time.time()函数来获取当前时间"
为什么用 default_factory?
# 如果用普通的默认值:
timestamp: float = time.time() # 错误!这样会在导入时固定时间
# 用 default_factory:
timestamp: float = field(default_factory=time.time) # 正确!每次创建对象时才调用
'''
history: List[Dict[str, str]] = field(default_factory=list)
'''
类型注解解释:
List[Dict[str, str]]
├── List: 列表类型
└── Dict[str, str]: 字典类型,键和值都是字符串
这是什么意思?
这是一个列表
列表里的每个元素都是一个字典
字典有两个字符串键值对
'''
session_id: str = "default_user_session" #类型是str(字符串),默认值是"default_user_session"
class AgentManager:
"""Agent管理器 - 增强版,支持任务规划和多智能体协作"""
def __init__(self, config_dir: str = None):
"""
初始化Agent管理器
Args:
config_dir: Agent配置文件目录(可选,MCP架构下通常不需要)
"""
self.config_dir = Path(config_dir) if config_dir else None
self.agents: Dict[str, AgentConfig] = {}
self.agent_sessions: Dict[str, Dict[str, AgentSession]] = {}
'''
初始化了什么?
config_dir: 配置文件的位置(可能没有)
agents: 空的字典,用来存放所有助手的配置信息
agent_sessions: 空的字典,用来存放所有助手的对话记录
重要:
AgentSession在后面会定义,用来记录对话历史
这是三层嵌套的字典:
第一层:助手名字 → 第二层字典
第二层:对话ID → 对话记录
第三层:具体的对话记录
'''
# 任务规划相关 - 现在通过agentserver的task_scheduler处理
# self.task_planner = None # 删除重复功能
# self.task_executor = None # 删除重复功能
# 从配置文件读取最大历史轮数
try:
from system.config import config, AI_NAME
self.max_history_rounds = config.api.max_history_rounds
except ImportError:
self.max_history_rounds = 10 # 默认值
logger.warning("无法导入配置,使用默认历史轮数设置")
'''
这是什么意思?
尝试从另一个配置文件读取设置
如果找不到那个文件,就用默认值(10轮对话历史)
max_history_rounds:最多保留多少轮对话记录
'''
self.context_ttl_hours = 24 # 上下文TTL(小时) #context_ttl_hours = 24:对话记录保存24小时
self.debug_mode = True # debug_mode = True:开启调试模式(会记录更多信息)
# 条件加载配置
# 只在指定了config_dir时才创建目录和加载配置
if self.config_dir:
# 确保配置目录存在
self.config_dir.mkdir(exist_ok=True)
# 加载Agent配置
self._load_agent_configs()
else:
logger.info("AgentManager使用MCP架构,跳过外部配置文件加载")
'''
逻辑判断:
如果提供了配置文件目录:创建目录并加载配置
否则:跳过加载,使用MCP架构(另一种管理方式)
'''
# 启动定期清理任务(只在事件循环中启动)
try:
loop = asyncio.get_running_loop()
asyncio.create_task(self._periodic_cleanup())
except RuntimeError:
# 没有运行的事件循环,跳过定期清理任务
pass
'''
这是什么?
启动一个"自动清理工"
这个清理工会每小时检查一次,删除超过24小时的旧对话记录
try...except:尝试启动,失败了也没关系
'''
# 任务规划器现在通过agentserver的task_scheduler处理
# self._init_task_planner() # 删除重复功能
logger.info(f"AgentManager初始化完成,已加载 {len(self.agents)} 个Agent")
def _load_agent_configs(self): #从JSON配置文件中加载所有助手的设置
"""从配置文件加载Agent定义"""
# 检查config_dir是否存在
if not self.config_dir:
logger.info("未指定配置文件目录,跳过Agent配置加载")
return
# 扫描配置文件目录
config_files = list(self.config_dir.glob("*.json")) #glob("*.json"):找出这个文件夹里所有.json文件
for config_file in config_files: #打开一个JSON文件
try:
with open(config_file, 'r', encoding='utf-8') as f:
config_data = json.load(f) #打开一个JSON文件,config_data里就保存了文件的所有内容
# 解析Agent配置
for agent_key, agent_data in config_data.items():
if self._validate_agent_config(agent_data):
agent_config = AgentConfig(
id=agent_data.get('model_id', ''),
name=agent_data.get('name', agent_key),
base_name=agent_data.get('base_name', agent_key),
system_prompt=agent_data.get('system_prompt', f'You are a helpful AI assistant named {agent_data.get("name", agent_key)}.'),
max_output_tokens=agent_data.get('max_output_tokens', 40000),
temperature=agent_data.get('temperature', 0.7),
description=agent_data.get('description', f'Assistant {agent_data.get("name", agent_key)}.'),
model_provider=agent_data.get('model_provider', 'openai'),
api_base_url=agent_data.get('api_base_url', ''),
api_key=agent_data.get('api_key', '')
)
self.agents[agent_key] = agent_config
'''
这个过程:
检查配置是否有效(_validate_agent_config)
用配置数据创建AgentConfig对象
把这个对象存到self.agents字典里
.get()方法的好处:
agent_data.get('name', agent_key)意思是:
如果配置里有name字段,就用它
如果没有,就用agent_key(键名)代替
'''
logger.info(f"已加载Agent: {agent_key} ({agent_config.name})")
except Exception as e:
logger.error(f"加载配置文件 {config_file} 失败: {e}")
#验证配置的方法
def _validate_agent_config(self, config: Dict[str, Any]) -> bool:
"""验证Agent配置"""
required_fields = ['model_id', 'name']#必须要有model_id和name这两个字段,如果缺少任何一个,就返回False(不通过)
for field in required_fields:
if field not in config or not config[field]:
logger.warning(f"Agent配置缺少必需字段: {field}")
return False
return True
def get_agent_session_history(self, agent_name: str, session_id: str = 'default_user_session') -> List[Dict[str, str]]:
"""获取Agent会话历史"""
if agent_name not in self.agent_sessions:
self.agent_sessions[agent_name] = {}
agent_sessions = self.agent_sessions[agent_name]
if session_id not in agent_sessions or self._is_context_expired(agent_sessions[session_id].timestamp):
agent_sessions[session_id] = AgentSession(session_id=session_id)
return agent_sessions[session_id].history
'''
这个方法做什么?
获取某个助手、某个对话的历史记录。
逻辑流程:
如果这个助手没有对话记录 → 创建一个空的记录字典
如果对话记录不存在或已过期 → 创建新的对话记录
返回对话历史
'''
def update_agent_session_history(self, agent_name: str, user_message: str, assistant_message: str, session_id: str = 'default_user_session'):
"""更新Agent会话历史,把新的一轮对话添加到历史记录中。"""
if agent_name not in self.agent_sessions:
self.agent_sessions[agent_name] = {}
agent_sessions = self.agent_sessions[agent_name]
if session_id not in agent_sessions or self._is_context_expired(agent_sessions[session_id].timestamp):
agent_sessions[session_id] = AgentSession(session_id=session_id)
session_data = agent_sessions[session_id]
session_data.history.extend([ #每次对话包含两条记录:
{"role": "user", "content": user_message}, #用户说的话
{"role": "assistant", "content": assistant_message} #助手说的话
])
session_data.timestamp = time.time()
# 限制历史消息数量
max_messages = self.max_history_rounds * 2
if len(session_data.history) > max_messages:
session_data.history = session_data.history[-max_messages:] #[-max_messages:]:取最后max_messages条记录
def _is_context_expired(self, timestamp: float) -> bool:
"""检查上下文是否过期"""
return (time.time() - timestamp) > (self.context_ttl_hours * 3600)
'''
计算逻辑:
time.time():现在的时间(秒数)
timestamp:上次对话的时间(秒数)
两者相减得到过了多少秒
如果超过24小时(24×3600秒)→ 返回True(已过期)
'''
async def _periodic_cleanup(self):
"""定期清理过期的会话上下文"""
while True:
try:
await asyncio.sleep(3600) # 每小时清理一次
if self.debug_mode:
logger.debug("执行定期上下文清理...")
'''
这是一个无限循环:
等待1小时(3600秒)
执行清理
回到第1步
'''
for agent_name, sessions in list(self.agent_sessions.items()):#list()的作用:创建列表副本,避免在循环中修改字典出错
for session_id, session_data in list(sessions.items()):
if self._is_context_expired(session_data.timestamp):
sessions.pop(session_id, None)
if self.debug_mode:
logger.debug(f"清理过期上下文: {agent_name}, session {session_id}")
if not sessions:
self.agent_sessions.pop(agent_name, None)
except Exception as e:
logger.error(f"定期清理任务出错: {e}")
def _replace_placeholders(self, text: str, agent_config: AgentConfig) -> str:
"""替换提示词中的占位符,支持Agent配置和环境变量"""
if not text:
return ""
processed_text = str(text)#把文本中的"模板变量"替换成真实的值,把{{AgentName}} 替换成 "小助手",把{{CurrentTime}} 替换成 "14:30:00"
# Agent配置相关的占位符替换
if agent_config:
# 基础Agent信息
processed_text = processed_text.replace("{{AgentName}}", agent_config.name)
processed_text = processed_text.replace("{{MaidName}}", agent_config.name)
processed_text = processed_text.replace("{{BaseName}}", agent_config.base_name)
processed_text = processed_text.replace("{{Description}}", agent_config.description)
processed_text = processed_text.replace("{{ModelId}}", agent_config.id)
# 配置参数
processed_text = processed_text.replace("{{Temperature}}", str(agent_config.temperature))
processed_text = processed_text.replace("{{MaxTokens}}", str(agent_config.max_output_tokens))
processed_text = processed_text.replace("{{ModelProvider}}", agent_config.model_provider)
'''
可以替换的占位符:
{{AgentName}} / {{MaidName}} → 助手的中文名
{{BaseName}} → 助手的英文名
{{Description}} → 助手的描述
{{ModelId}} → 模型ID
{{Temperature}} → 温度参数
{{MaxTokens}} → 最大输出字数
{{ModelProvider}} → 模型提供商
'''
# 环境变量占位符替换
import os
import re
# 匹配 {{ENV_VAR_NAME}} 格式的环境变量,环境变量是电脑系统里的设置,比如API_KEY="sk-abc123",{{API_KEY}}会被替换成"sk-abc123"
env_pattern = r'\{\{([A-Z_][A-Z0-9_]*)\}\}' #r'\{\{([A-Z_][A-Z0-9_]*)\}\}' 匹配 {{大写字母_数字}},比如:{{API_KEY}}、{{DATABASE_URL}}
for match in re.finditer(env_pattern, processed_text):
env_var_name = match.group(1)
env_value = os.getenv(env_var_name, '')
processed_text = processed_text.replace(f"{{{{{env_var_name}}}}}", env_value)
# 时间相关占位符
from datetime import datetime
now = datetime.now()
processed_text = processed_text.replace("{{CurrentTime}}", now.strftime("%H:%M:%S"))
processed_text = processed_text.replace("{{CurrentDate}}", now.strftime("%Y-%m-%d"))
processed_text = processed_text.replace("{{CurrentDateTime}}", now.strftime("%Y-%m-%d %H:%M:%S"))
'''
时间格式:
{{CurrentTime}} → "14:30:00"(小时:分钟:秒)
{{CurrentDate}} → "2024-01-01"(年-月-日)
{{CurrentDateTime}} → "2024-01-01 14:30:00"(完整时间)
'''
return processed_text
def _build_system_message(self, agent_config: AgentConfig) -> Dict[str, str]:
"""构建系统消息,包含Agent的身份、行为、风格等"""
# 处理系统提示词中的占位符
processed_system_prompt = self._replace_placeholders(agent_config.system_prompt, agent_config)
return {
"role": "system",
"content": processed_system_prompt
}
'''
这是什么?系统消息告诉AI助手"你是谁",比如:
{
"role": "system",
"content": "你是一个帮助人的AI助手,名字叫小助手。"
}
'''
def _build_user_message(self, prompt: str, agent_config: AgentConfig) -> Dict[str, str]:
"""构建用户消息,处理用户输入"""
# 处理用户提示词中的占位符
processed_prompt = self._replace_placeholders(prompt, agent_config)
return {
"role": "user",
"content": processed_prompt
}
'''
用户消息的例子:
{
"role": "user",
"content": "你好,现在时间是{{CurrentTime}}"
}
'''
def _build_assistant_message(self, content: str) -> Dict[str, str]:
"""构建助手消息"""
return {
"role": "assistant",
"content": content
}
'''
助手消息的例子:
{
"role": "assistant",
"content": "你好!现在是14:30:00。"
}
'''
def _validate_messages(self, messages: List[Dict[str, str]]) -> bool:
"""验证消息序列的有效性"""
if not messages: #检查消息是否为空
return False
# 检查消息格式
for msg in messages:
if not isinstance(msg, dict):
return False
if 'role' not in msg or 'content' not in msg:
return False
if msg['role'] not in ['system', 'user', 'assistant']:
return False
if not isinstance(msg['content'], str):
return False
# 检查系统消息是否在开头
if messages[0]['role'] != 'system':
return False
return True
'''
为什么需要系统消息在第一?
这是OpenAI API的要求
系统消息最先告诉AI"你的角色是什么"
'''
#call_agent 方法,这是整个文件最重要的方法,用来调用AI助手。
async def call_agent(self, agent_name: str, prompt: str, session_id: str = None) -> Dict[str, Any]:
"""
调用指定的Agent
Args:
agent_name: Agent名称
prompt: 用户提示词
session_id: 会话ID
Returns:
Dict[str, Any]: 调用结果
"""
# 检查Agent是否存在
if agent_name not in self.agents:
available_agents = list(self.agents.keys())
error_msg = f"请求的Agent '{agent_name}' 未找到或未正确配置。" #如果要找的助手不存在 → 返回错误信息
if available_agents: #错误信息里会告诉你哪些助手可用
error_msg += f" 当前已加载的Agent有: {', '.join(available_agents)}。"
else:
error_msg += " 当前没有加载任何Agent。请检查配置文件。"
error_msg += " 请确认您请求的Agent名称是否准确。"
logger.error(f"Agent调用失败: {error_msg}")
return {"status": "error", "error": error_msg}
agent_config = self.agents[agent_name]
# 生成会话ID
if not session_id:
session_id = f"agent_{agent_config.base_name}_default_user_session"
try: #构建完整的消息序列
# 获取会话历史
history = self.get_agent_session_history(agent_name, session_id)
# 构建完整的消息序列
messages = []
# 1. 系统消息:设定Agent的身份、行为、风格等
system_message = self._build_system_message(agent_config)
messages.append(system_message)
# 2. 历史消息:保留多轮对话的上下文
messages.extend(history)
# 3. 当前用户输入:本次要处理的任务内容
user_message = self._build_user_message(prompt, agent_config)
messages.append(user_message)
'''
消息序列的组成:
[
{系统消息: "你是助手A"}, # 第一条必须是系统消息
{用户: "你好"}, # 历史对话的第一轮
{助手: "你好!"}, # 历史对话的第一轮
{用户: "今天天气怎么样"}, # 历史对话的第二轮
{助手: "今天晴天"}, # 历史对话的第二轮
{用户: "现在是什么时间"} # 当前的新问题
]
'''
# 验证消息序列
if not self._validate_messages(messages):
return {"status": "error", "error": "消息序列格式无效"}
# 记录调试信息
if self.debug_mode:
logger.debug(f"Agent调用消息序列:")
for i, msg in enumerate(messages):
logger.debug(f" [{i}] {msg['role']}: {msg['content'][:100]}...")
# 调用LLM API
response = await self._call_llm_api(agent_config, messages)
if response.get("status") == "success":
assistant_response = response.get("result", "")
# 更新会话历史
self.update_agent_session_history(
agent_name, user_message['content'], assistant_response, session_id
)
return {"status": "success", "result": assistant_response}
else:
return response
'''
处理流程:
调用AI模型API
如果成功 → 保存对话历史 → 返回结果
如果失败 → 直接返回错误
'''
except Exception as e:
error_msg = f"调用Agent '{agent_name}' 时发生错误: {str(e)}"
logger.error(f"Agent调用异常: {error_msg}")
return {"status": "error", "error": error_msg}
# 调用LLM API的核心方法
async def _call_llm_api(self, agent_config: AgentConfig, messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""调用LLM API,使用Agent配置中的参数"""
try:
# 使用新版本的OpenAI API
from openai import AsyncOpenAI
# 记录调试信息
if self.debug_mode:
logger.debug(f"调用LLM API - Agent: {agent_config.name}")
logger.debug(f" 模型: {agent_config.id}")
logger.debug(f" 提供商: {agent_config.model_provider}")
logger.debug(f" API URL: {agent_config.api_base_url}")
logger.debug(f" 温度: {agent_config.temperature}")
logger.debug(f" 最大Token: {agent_config.max_output_tokens}")
logger.debug(f" 消息数量: {len(messages)}")
# 验证必要的配置参数
if not agent_config.id:
return {"status": "error", "error": "Agent配置缺少模型ID"}
if not agent_config.api_key:
return {"status": "error", "error": "Agent配置缺少API密钥"}
# 创建客户端,使用Agent配置中的参数
client = AsyncOpenAI(
api_key=agent_config.api_key, #api_key: 访问API的密码
#base_url: API的网址,如果没配置,默认用DeepSeek的API
base_url=agent_config.api_base_url or "https://api.deepseek.com/v1"
)
# 准备API调用参数
api_params = {
"model": agent_config.id, #model: 使用哪个AI模型
"messages": messages, #messages: 对话消息列表
"max_tokens": agent_config.max_output_tokens, #max_tokens: 最多生成多少字
"temperature": agent_config.temperature, #temperature: 创造力参数(0.7是中等创造力)
"stream": False #stream: False: 不要流式传输(一次性返回结果)
}
# 记录API调用参数(调试模式)
if self.debug_mode:
logger.debug(f"API调用参数: {api_params}")
# 调用API
response = await client.chat.completions.create(**api_params)
# 提取响应内容
assistant_content = response.choices[0].message.content
#response.choices[0].message.content 就是AI的回答
# 记录响应信息(调试模式)
if self.debug_mode:
usage = response.usage
logger.debug(f"API响应成功:")
logger.debug(f" 使用Token: {usage.prompt_tokens} (输入) + {usage.completion_tokens} (输出) = {usage.total_tokens} (总计)")
logger.debug(f" 响应长度: {len(assistant_content)} 字符")
return {"status": "success", "result": assistant_content}
except Exception as e:
error_msg = f"LLM API调用失败: {str(e)}"
logger.error(f"Agent '{agent_config.name}' API调用失败: {error_msg}")
# 记录详细的错误信息(调试模式)
if self.debug_mode:
import traceback
logger.debug(f"详细错误信息:")
logger.debug(traceback.format_exc())
return {"status": "error", "error": error_msg}
#其他功能方法
def get_available_agents(self) -> List[Dict[str, Any]]:
"""获取所有可用的Agent列表"""
return [
{
"name": agent_config.name,
"base_name": agent_config.base_name,
"description": agent_config.description,
"model_id": agent_config.id,
"temperature": agent_config.temperature,
"max_output_tokens": agent_config.max_output_tokens
}
for agent_config in self.agents.values()
]
'''
返回格式:
[
{
"name": "助手A",
"base_name": "assistant_a",
"description": "这是一个助手",
"model_id": "gpt-4",
"temperature": 0.7,
"max_output_tokens": 40000
}
]
'''
def get_agent_info(self, agent_name: str) -> Optional[Dict[str, Any]]:
"""获取指定Agent的详细信息"""
if agent_name not in self.agents:
return None
agent_config = self.agents[agent_name]
return {
"name": agent_config.name,
"base_name": agent_config.base_name,
"description": agent_config.description,
"model_id": agent_config.id,
"temperature": agent_config.temperature,
"max_output_tokens": agent_config.max_output_tokens,
"system_prompt": agent_config.system_prompt,
"model_provider": agent_config.model_provider
}
def reload_configs(self):
"""重新加载Agent配置,修改配置文件后,不用重启程序,调用这个方法就能重新加载。"""
self.agents.clear()
self._load_agent_configs()
logger.info("Agent配置已重新加载")
def _register_agent_from_manifest(self, agent_name: str, agent_config: Dict[str, Any]):
"""从manifest注册Agent
Args:
agent_name: Agent名称
agent_config: Agent配置字典
"""
try:
# 验证配置
if not self._validate_agent_config(agent_config):
logger.warning(f"Agent配置验证失败: {agent_name}")
return False
# 创建AgentConfig对象
agent_config_obj = AgentConfig(
id=agent_config.get('model_id', ''),
name=agent_config.get('name', agent_name),
base_name=agent_config.get('base_name', agent_name),
system_prompt=agent_config.get('system_prompt', f'You are a helpful AI assistant named {agent_config.get("name", agent_name)}.'),
max_output_tokens=agent_config.get('max_output_tokens', 8192),
temperature=agent_config.get('temperature', 0.7),
description=agent_config.get('description', f'Assistant {agent_config.get("name", agent_name)}.'),
model_provider=agent_config.get('model_provider', 'openai'),
api_base_url=agent_config.get('api_base_url', ''),
api_key=agent_config.get('api_key', '')
)
# 注册到agents字典
self.agents[agent_name] = agent_config_obj
logger.info(f"已从manifest注册Agent: {agent_name} ({agent_config_obj.name})")
return True
except Exception as e:
logger.error(f"从manifest注册Agent失败 {agent_name}: {e}")
return False
async def call_agent_by_action(self, agent_name: str, action_args: Dict[str, Any]) -> str:
"""根据动作调用Agent,不直接给提示词,而是给"动作",比如:
Args:
agent_name: Agent名称
action_args: 动作参数,包含action和具体参数
Returns:
str: Agent执行结果
"""
try:
# 检查Agent是否存在
if agent_name not in self.agents:
return f"Agent '{agent_name}' 未找到或未正确配置"
agent_config = self.agents[agent_name]
action = action_args.get('action', '')
# 构建用户提示词
user_prompt = self._build_action_prompt(action, action_args)
# 调用Agent
result = await self.call_agent(agent_name, user_prompt)
if result.get("status") == "success":
return result.get("result", "")
else:
return f"Agent调用失败: {result.get('error', '未知错误')}"
except Exception as e:
logger.error(f"Agent动作调用失败 {agent_name}: {e}")
return f"Agent动作调用异常: {str(e)}"
def _build_action_prompt(self, action: str, action_args: Dict[str, Any]) -> str:
"""构建动作提示词
Args:
action: 动作名称
action_args: 动作参数
Returns:
str: 构建的提示词
"""
# 移除不需要的参数
clean_args = {k: v for k, v in action_args.items()
if k not in ['service_name', 'action']}
# 构建提示词
if clean_args:
args_str = ", ".join([f"{k}: {v}" for k, v in clean_args.items()])
return f"请执行动作 '{action}',参数: {args_str}"
else:
return f"请执行动作 '{action}'"
# 删除重复的任务规划器初始化方法
# 现在通过agentserver的task_scheduler处理任务规划
async def process_intelligent_task(self, query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
智能任务处理 - 核心方法,通过agentserver的task_scheduler处理
处理更复杂的任务,可以分解成多个步骤。
Args:
query: 用户查询
context: 上下文信息
Returns:
Dict[str, Any]: 处理结果
"""
try:
# 通过agentserver的task_scheduler处理智能任务
from agentserver.task_scheduler import get_task_scheduler
task_scheduler = get_task_scheduler()
# 创建任务并调度
import uuid #用来标识这个任务
task_id = str(uuid.uuid4())
# 注册任务到调度器
task_scheduler.task_registry[task_id] = {
"id": task_id,
"type": "processor",
"status": "queued",
"params": {"query": query},
"context": context
}
# 调度并行执行,把任务交给"任务调度器"去执行,可以并行处理多个任务。
tasks = [{
"type": "processor",
"params": {"query": query},
"session_id": context.get("session_id") if context else None
}]
results = await task_scheduler.schedule_parallel_execution(tasks)
if results and len(results) > 0:
result = results[0]
return {
"status": "success" if result.get("success") else "error",
"result": result.get("result"),
"error": result.get("error"),
"task_id": task_id
}
else:
return {
"status": "error",
"error": "任务调度失败"
}
except Exception as e:
logger.error(f"智能任务处理失败: {e}")
# 降级到传统Agent调用,如果复杂任务处理失败,就改用简单的调用方式
return await self.call_agent("default", query)
# 全局Agent管理器实例
_AGENT_MANAGER = None
'''
这是什么?
创建一个全局变量_AGENT_MANAGER
初始值是None(空)
变量名前面的下划线_是约定,表示"这是内部变量"
'''
def get_agent_manager() -> AgentManager:
"""获取全局Agent管理器实例"""
global _AGENT_MANAGER
if _AGENT_MANAGER is None:
_AGENT_MANAGER = AgentManager()
return _AGENT_MANAGER
'''
这是一个"单例模式"的设计:
逻辑流程:
第一次调用 get_agent_manager():
_AGENT_MANAGER 是 None → 创建新的 AgentManager → 返回它
第二次调用 get_agent_manager():
_AGENT_MANAGER 已经有值了 → 直接返回这个值
为什么这样设计?
整个程序只需要一个AgentManager实例
避免了重复创建,节省内存
所有地方都使用同一个实例
'''
# 便捷函数
async def call_agent(agent_name: str, prompt: str, session_id: str = None) -> Dict[str, Any]:
"""便捷的Agent调用函数,这是一个"包装函数":
它让调用AI助手变得非常简单:"""
manager = get_agent_manager()
return await manager.call_agent(agent_name, prompt, session_id)
def list_agents() -> List[Dict[str, Any]]:
"""便捷的Agent列表获取函数"""
manager = get_agent_manager() #返回所有可用助手的列表,比如:
return manager.get_available_agents()
'''
agents = list_agents()
# agents 会是:
# [
# {"name": "助手A", "description": "...", ...},
# {"name": "助手B", "description": "...", ...}
# ]
'''
def get_agent_info(agent_name: str) -> Optional[Dict[str, Any]]:
"""便捷的Agent信息获取函数, 获取助手详细信息的函数"""
manager = get_agent_manager()
return manager.get_agent_info(agent_name)
'''
获取某个助手的详细信息,比如:
info = get_agent_info("助手A")
# info 会是:
# {
# "name": "助手A",
# "base_name": "assistant_a",
# "description": "这是一个助手",
# "system_prompt": "你是一个助手",
# ...
# }
注意:如果助手不存在,返回None,Optional[Dict[str, Any]]表示:要么返回字典,要么返回None
'''
# 智能任务处理函数 - 通过agentserver处理
async def process_intelligent_task(query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""便捷的智能任务处理函数 - 通过agentserver的task_scheduler处理,让复杂的任务处理变得简单。"""
manager = get_agent_manager()
return await manager.process_intelligent_task(query, context)
# 任务管理函数 - 通过agentserver的task_scheduler处理
async def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
"""便捷的任务状态获取函数 - 通过agentserver处理"""
try:
from agentserver.task_scheduler import get_task_scheduler
task_scheduler = get_task_scheduler()
return await task_scheduler.get_task_status(task_id)
except Exception as e:
logger.error(f"获取任务状态失败: {e}")
return None
'''
使用示例:
status = await get_task_status("123e4567-e89b-12d3-a456-426614174000")
# status 可能是:
# {
# "id": "123e4567...",
# "status": "running", # 运行中
# "progress": 50, # 进度50%
# "result": None
# }
'''
#这个函数忽略了status_filter参数,总是返回所有运行中的任务。
async def get_task_list(status_filter: Optional[str] = None) -> List[Dict[str, Any]]:
"""便捷的任务列表获取函数 - 通过agentserver处理"""
try:
from agentserver.task_scheduler import get_task_scheduler
task_scheduler = get_task_scheduler()
return await task_scheduler.get_running_tasks()
except Exception as e:
logger.error(f"获取任务列表失败: {e}")
return []
async def get_execution_stats() -> Dict[str, Any]:
"""便捷的执行统计获取函数 - 通过agentserver处理"""
try:
from agentserver.task_scheduler import get_task_scheduler
task_scheduler = get_task_scheduler()
return {
"total_tasks": len(task_scheduler.task_registry),
"running_tasks": len([t for t in task_scheduler.task_registry.values() if t.get("status") == "running"]),
"queued_tasks": len([t for t in task_scheduler.task_registry.values() if t.get("status") == "queued"])
}
except Exception as e:
logger.error(f"获取执行统计失败: {e}")
return {}
'''
这个函数返回:
{
"total_tasks": 10, // 总任务数
"running_tasks": 3, // 运行中的任务数
"queued_tasks": 7 // 排队中的任务数
}
统计逻辑:
total_tasks: 任务注册表中所有任务的数量
running_tasks: 状态是"running"的任务数量
queued_tasks: 状态是"queued"的任务数量
'''
async def cancel_task(task_id: str) -> bool:
"""便捷的任务取消函数 - 通过agentserver处理"""
try:
from agentserver.task_scheduler import get_task_scheduler
task_scheduler = get_task_scheduler()
if task_id in task_scheduler.task_registry:
task_scheduler.task_registry[task_id]["status"] = "cancelled"
return True
return False
except Exception as e:
logger.error(f"取消任务失败: {e}")
return False
'''
1. 检查任务是否存在
2. 如果存在 → 把状态改成"cancelled" → 返回True
3. 如果不存在 → 返回False'''
# ==========为什么没有主函数入口===========
#1. 这是模块(Module),不是脚本(Script)
#Python文件的两种类型:
'''
类型 | 用途 | 是否有if __name__ == "__main__" | 例子
模块 | 被其他文件导入使用 | 通常没有 | math.py, random.py, agent_manager.py
脚本 | 直接运行执行任务 | 必须有 | main.py, run_server.py
这个agent_manager.py是模块:它提供功能,但不直接运行,其他文件导入它,使用它的功能
'''
#AgentManager 模块完整层级结构
'''
agent_manager.py
├── 模块头部(文件配置)
│ ├── #!/usr/bin/env python3 # 执行环境声明
│ ├── # -*- coding: utf-8 -*- # 编码声明
│ └── """Agent管理器 - 独立的Agent注册和调用系统""" # 模块文档
│
├── 导入模块
│ ├── 系统模块
│ │ ├── os # 操作系统交互
│ │ ├── json # JSON数据处理
│ │ ├── asyncio # 异步编程支持
│ │ ├── logging # 日志记录
│ │ └── time # 时间处理
│ │
│ ├── 路径处理
│ │ └── from pathlib import Path # 现代化路径操作
│ │
│ ├── 类型提示
│ │ └── from typing import Dict, Any, Optional, List # 类型注解
│ │
│ ├── 数据类
│ │ └── from dataclasses import dataclass, field # 简化类定义
│ │
│ ├── 日期时间
│ │ └── from datetime import datetime, timedelta # 时间日期操作
│ │
│ └── 正则表达式
│ └── import re # 字符串模式匹配
│
├── 日志配置
│ ├── logging.basicConfig(level=logging.INFO) # 基础日志配置
│ ├── logger = logging.getLogger("AgentManager") # 获取日志记录器
│ └── 屏蔽HTTP库DEBUG日志(3行) # 减少日志噪音
│
├── 数据类定义(Data Classes)
│ ├── AgentConfig(Agent配置模板)
│ │ ├── id: str # 模型ID(必需)
│ │ ├── name: str # Agent名称-中文(必需)
│ │ ├── base_name: str # 基础名称-英文(必需)
│ │ ├── system_prompt: str # 系统提示词(必需)
│ │ ├── max_output_tokens: int = 40000 # 最大输出token数
│ │ ├── temperature: float = 0.7 # 温度参数
│ │ ├── description: str = "" # 描述信息
│ │ ├── model_provider: str = "openai" # 模型提供商
│ │ ├── api_base_url: str = "" # API基础URL
│ │ └── api_key: str = "" # API密钥
│ │
│ └── AgentSession(Agent会话记录)
│ ├── timestamp: float = field(default_factory=time.time) # 时间戳
│ ├── history: List[Dict[str, str]] = field(default_factory=list) # 历史记录
│ └── session_id: str = "default_user_session" # 会话ID
│
├── 核心管理器类(AgentManager)
│ ├── 初始化方法 __init__
│ │ ├── 参数: config_dir: str = None # 配置文件目录(可选)
│ │ ├── 属性初始化
│ │ │ ├── self.config_dir # 配置目录路径
│ │ │ ├── self.agents # Agent配置字典
│ │ │ └── self.agent_sessions # Agent会话字典(三层嵌套)
│ │ │
│ │ ├── 配置读取
│ │ │ ├── self.max_history_rounds # 最大历史轮数
│ │ │ ├── self.context_ttl_hours = 24 # 上下文有效期(小时)
│ │ │ └── self.debug_mode = True # 调试模式开关
│ │ │
│ │ ├── 条件加载配置
│ │ │ ├── if self.config_dir # 有配置目录时
│ │ │ │ ├── self.config_dir.mkdir() # 创建目录
│ │ │ │ └── self._load_agent_configs() # 加载配置
│ │ │ └── else # 无配置目录时
│ │ │ └── logger.info() # 记录MCP架构提示
│ │ │
│ │ ├── 定期清理任务
│ │ │ ├── try: asyncio.get_running_loop() # 获取事件循环
│ │ │ ├── asyncio.create_task() # 创建清理任务
│ │ │ └── except RuntimeError: pass # 无事件循环时跳过
│ │ │
│ │ └── 初始化完成日志
│ │
│ ├── 配置管理方法
│ │ ├── _load_agent_configs() # 加载Agent配置
│ │ │ ├── 检查目录存在性
│ │ │ ├── 扫描JSON配置文件
│ │ │ ├── 读取并解析JSON
│ │ │ ├── 验证配置有效性
│ │ │ ├── 创建AgentConfig对象
│ │ │ └── 注册到agents字典
│ │ │
│ │ ├── _validate_agent_config() # 验证Agent配置
│ │ │ ├── 检查必需字段: ['model_id', 'name']
│ │ │ └── 返回验证结果: bool
│ │ │
│ │ ├── _register_agent_from_manifest() # 从manifest注册Agent
│ │ │ ├── 验证配置
│ │ │ ├── 创建AgentConfig对象
│ │ │ ├── 注册到agents字典
│ │ │ └── 返回注册结果: bool
│ │ │
│ │ └── reload_configs() # 重新加载配置
│ │ ├── 清空现有agents
│ │ └── 重新加载配置
│ │
│ ├── 会话管理方法
│ │ ├── get_agent_session_history() # 获取Agent会话历史
│ │ │ ├── 检查Agent是否存在
│ │ │ ├── 检查会话是否存在
│ │ │ ├── 检查会话是否过期
│ │ │ └── 返回历史记录列表
│ │ │
│ │ ├── update_agent_session_history() # 更新Agent会话历史
│ │ │ ├── 添加新对话记录
│ │ │ ├── 更新时间戳
│ │ │ └── 限制历史消息数量
│ │ │
│ │ ├── _is_context_expired() # 检查上下文是否过期
│ │ │ └── 判断时间差是否超过TTL
│ │ │
│ │ └── _periodic_cleanup() # 定期清理过期的会话
│ │ ├── 每小时执行一次
│ │ ├── 遍历所有Agent会话
│ │ ├── 删除过期会话
│ │ └── 清理空会话字典
│ │
│ ├── 消息构建方法
│ │ ├── _replace_placeholders() # 替换提示词占位符
│ │ │ ├── Agent配置替换(9种占位符)
│ │ │ ├── 环境变量替换(正则匹配)
│ │ │ └── 时间相关替换(3种格式)
│ │ │
│ │ ├── _build_system_message() # 构建系统消息
│ │ │ ├── 处理系统提示词占位符
│ │ │ └── 返回{"role": "system", "content": ...}
│ │ │
│ │ ├── _build_user_message() # 构建用户消息
│ │ │ ├── 处理用户提示词占位符
│ │ │ └── 返回{"role": "user", "content": ...}
│ │ │
│ │ ├── _build_assistant_message() # 构建助手消息
│ │ │ └── 返回{"role": "assistant", "content": ...}
│ │ │
│ │ └── _validate_messages() # 验证消息序列
│ │ ├── 检查消息非空
│ │ ├── 检查消息格式正确性
│ │ ├── 检查消息角色合法性
│ │ └── 检查系统消息在首位
│ │
│ ├── Agent调用方法
│ │ ├── call_agent() # 调用指定Agent
│ │ │ ├── 检查Agent是否存在
│ │ │ ├── 生成会话ID(如果未提供)
│ │ │ ├── 构建完整消息序列(系统+历史+当前)
│ │ │ ├── 验证消息序列
│ │ │ ├── 记录调试信息(如果debug模式)
│ │ │ ├── 调用LLM API
│ │ │ ├── 更新会话历史(如果成功)
│ │ │ └── 返回调用结果
│ │ │
│ │ ├── _call_llm_api() # 调用LLM API
│ │ │ ├── 导入AsyncOpenAI
│ │ │ ├── 记录调试信息
│ │ │ ├── 验证必需参数(model_id, api_key)
│ │ │ ├── 创建API客户端
│ │ │ ├── 准备API参数
│ │ │ ├── 调用chat.completions.create()
│ │ │ ├── 提取响应内容
│ │ │ └── 返回成功结果或错误
│ │ │
│ │ ├── call_agent_by_action() # 根据动作调用Agent
│ │ │ ├── 检查Agent是否存在
│ │ │ ├── 提取动作参数
│ │ │ ├── 构建动作提示词
│ │ │ ├── 调用Agent
│ │ │ └── 返回执行结果
│ │ │
│ │ └── _build_action_prompt() # 构建动作提示词
│ │ ├── 清理不需要的参数
│ │ ├── 格式化参数字符串
│ │ └── 返回格式化提示词
│ │
│ ├── 信息查询方法
│ │ ├── get_available_agents() # 获取所有可用Agent列表
│ │ │ └── 返回包含基本信息的字典列表
│ │ │
│ │ └── get_agent_info() # 获取指定Agent详细信息
│ │ ├── 检查Agent是否存在
│ │ └── 返回包含完整信息的字典
│ │
│ └── 智能任务处理方法
│ └── process_intelligent_task() # 智能任务处理
│ ├── 通过agentserver.task_scheduler处理
│ ├── 创建唯一任务ID(UUID)
│ ├── 注册任务到调度器
│ ├── 调度并行执行
│ ├── 处理执行结果
│ └── 降级处理(失败时调用默认Agent)
│
├── 全局实例管理
│ ├── 全局变量: _AGENT_MANAGER = None # 单例实例
│ │
│ └── 获取实例函数: get_agent_manager()
│ ├── 声明使用全局变量
│ ├── 检查实例是否存在
│ ├── 不存在时创建新实例
│ └── 返回实例
│
└── 便捷函数(Convenience Functions)
├── 调用相关
│ ├── call_agent() # 便捷调用函数
│ ├── list_agents() # 便捷列表获取函数
│ ├── get_agent_info() # 便捷信息获取函数
│ └── process_intelligent_task() # 便捷智能任务处理函数
│
└── 任务管理相关(通过agentserver.task_scheduler)
├── get_task_status() # 获取任务状态
├── get_task_list() # 获取任务列表
├── get_execution_stats() # 获取执行统计
└── cancel_task() # 取消任务
'''
#模块使用流程总结
'''
使用流程:
1. 导入模块函数
→ from agent_manager import call_agent, list_agents
2. 获取Agent列表
→ agents = list_agents()
3. 选择并调用Agent
→ result = await call_agent("助手名称", "用户消息", "session_id")
4. 处理结果
→ if result["status"] == "success": 使用 result["result"]
→ else: 处理错误 result["error"]'''
task_scheduler.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""通用任务调度器 - 融合智能记忆管理的任务调度系统"""
import asyncio # 异步支持 #
import uuid # 任务ID #
import json # JSON处理 #
import logging # 日志 #
import time # 时间处理 #
from typing import Any, Dict, List, Optional, Union # 类型标注 #
from dataclasses import dataclass, field # 数据类 #
from datetime import datetime, timedelta # 时间处理 #
# 配置日志
logger = logging.getLogger(__name__)
#创建了一个“日志记录器”,名字是当前模块的名字,想象成一个专门的记事本,用来记录程序运行情况
@dataclass
class TaskStep:
"""任务步骤数据类"""
step_id: str #step_id - 步骤ID(唯一编号)
task_id: str #task_id - 属于哪个任务
purpose: str # 步骤目的,这步要做什么
content: str # 步骤内容
output: str = "" # 步骤输出,输出结果(默认是空字符串)
analysis: Optional[Dict[str, Any]] = None # 分析结果
timestamp: float = field(default_factory=time.time) #时间戳(自动记录当前时间)
success: bool = True # 是否成功
error: Optional[str] = None # 错误信息
@dataclass #这个类用来存储压缩后的记忆,就像把很多经验总结成几条要点。
class CompressedMemory:
"""压缩记忆数据类"""
memory_id: str #记忆ID
key_findings: List[str] # 关键发现(列表)
failed_attempts: List[str] # 失败尝试(列表)
current_status: str # 当前状态
next_steps: List[str] # 建议步骤
source_steps: int # 来源步骤数
timestamp: float = field(default_factory=time.time)
#注意:类名以 _ 开头,这是一个约定,表示"内部类"或"私有类"
class _TaskScheduler: #负责管理和协调所有任务
"""通用任务调度器 - 融合智能记忆管理"""
def __init__(self, config: Optional[Any] = None) -> None: #这是类的"构造函数",创建对象时会自动调用:
# 导入配置
if config is None: #如果没传入配置,就从 agentserver.config 获取,像设定游戏的初始参数一样
from agentserver.config import get_task_scheduler_config
config = get_task_scheduler_config()
self.config = config
# 基础任务管理
#task_registry:一个字典,记录所有任务(像花名册)
self.task_registry: Dict[str, Dict[str, Any]] = {} # 任务注册表
#_lock:一把"锁",防止多个任务同时修改数据(像厕所的门锁)
self._lock = asyncio.Lock() # 并发锁
# 智能记忆管理
self.max_steps = config.max_steps # max_steps:最多保存多少步骤
self.compression_threshold = config.compression_threshold # 压缩阈值,compression_threshold:达到多少步骤时压缩
self.keep_last_steps = config.keep_last_steps # 压缩后保留步骤数
#数据存储容器
self.task_steps: Dict[str, List[TaskStep]] = {} # 任务步骤历史,task_steps:每个任务的所有步骤
self.compressed_memories: List[CompressedMemory] = [] # 压缩记忆,compressed_memories:压缩后的记忆
self.key_facts: Dict[str, str] = {} # 关键事实存储,key_facts:重要的发现(像书签)
self.failed_attempts: Dict[str, int] = {} # 失败尝试计数,failed_attempts:失败的次数统计
self.llm_config: Optional[Dict[str, Any]] = None # LLM配置,llm_config:AI模型配置(还没设置)
# 会话级别的记忆管理 - 新增功能,这是扩展功能,支持"会话"概念:
self.session_memories: Dict[str, Dict[str, Any]] = {} # 会话记忆:session_id -> {tasks, compressed_memories, key_facts}
#session_memories:按会话组织记忆(像文件夹)
self.session_task_mapping: Dict[str, str] = {} # 任务ID到会话ID的映射
#session_task_mapping:任务属于哪个会话
self.analysis_session_mapping: Dict[str, str] = {} # 分析会话ID到原始会话ID的映射
#analysis_session_mapping:分析会话和原会话的关系
#设置LLM配置的方法,很简单,就是设置AI模型配置
def set_llm_config(self, config: Dict[str, Any]) -> None:
"""设置LLM配置用于智能压缩"""
self.llm_config = config
#创建任务的方法 create_task,这是非常重要的方法,创建一个新任务:
async def create_task(self, task_id: str, purpose: str, session_id: Optional[str] = None,
analysis_session_id: Optional[str] = None) -> str:
"""创建新任务 - 应用与MCP服务器相同的会话管理逻辑,并关联会话记忆"""
#上锁和注册
async with self._lock: #async with self._lock::先锁上,保证安全
self.task_registry[task_id] = { #在 task_registry 中注册新任务,包含基本信息
"id": task_id,
"purpose": purpose,
"session_id": session_id,
"analysis_session_id": analysis_session_id,
"status": "created",
"created_at": time.time(),
"steps_count": 0
}
# 初始化任务步骤列表
if task_id not in self.task_steps:
self.task_steps[task_id] = [] #为这个任务创建一个空步骤列表
# 会话级别的记忆管理,如果有会话ID,建立各种映射关系
if session_id:
# 建立任务到会话的映射
self.session_task_mapping[task_id] = session_id
# 建立分析会话到原始会话的映射
if analysis_session_id:
self.analysis_session_mapping[analysis_session_id] = session_id
# 初始化会话记忆(如果不存在),如果这个会话是新的,就初始化它的记忆仓库
if session_id not in self.session_memories:
self.session_memories[session_id] = {
"tasks": [],
"compressed_memories": [],
"key_facts": {},
"failed_attempts": {},
"created_at": time.time(),
"last_activity": time.time()
}
# 将会话记忆与任务关联
self.session_memories[session_id]["tasks"].append(task_id) #把任务ID加入会话的任务列表
self.session_memories[session_id]["last_activity"] = time.time() #更新会话最后活动时间
logger.info(f"[任务创建] 创建任务: {task_id}, 目的: {purpose}, 会话: {session_id}, 分析会话: {analysis_session_id}")
return task_id
async def add_task_step(self, task_id: str, step: TaskStep) -> None:
"""添加任务步骤到历史记录,并更新会话级别的记忆管理"""
async with self._lock:
if task_id not in self.task_steps: #安全检查
self.task_steps[task_id] = [] #添加步骤到对应任务的步骤列表
self.task_steps[task_id].append(step)
# 提取关键事实,调用下面的 _extract_key_facts 方法
self._extract_key_facts(step)
# 记录失败尝试
if not step.success: #如果步骤失败,记录这个失败内容
self.failed_attempts[step.content] = self.failed_attempts.get(step.content, 0) + 1 #使用 .get() 避免键不存在
# 会话级别的记忆管理
session_id = self.session_task_mapping.get(task_id)
if session_id and session_id in self.session_memories: #获取任务所属会话,如果有会话,更新该会话的记忆
# 更新会话记忆中的关键事实
fact_key = f"task:{task_id}:step:{step.step_id}"
if fact_key in self.key_facts:
self.session_memories[session_id]["key_facts"][fact_key] = self.key_facts[fact_key]
# 更新会话记忆中的失败尝试
if not step.success:
session_failed = self.session_memories[session_id]["failed_attempts"]
session_failed[step.content] = session_failed.get(step.content, 0) + 1
# 更新会话活动时间
self.session_memories[session_id]["last_activity"] = time.time()
# 检查是否需要压缩记忆
if len(self.task_steps[task_id]) >= self.compression_threshold: #如果步骤数达到阈值,就压缩记忆
await self._compress_memory(task_id)
def _extract_key_facts(self, step: TaskStep) -> None:
"""从步骤中提取关键事实"""
# 提取关键命令和结果
if step.content and step.output:
#如果有内容和输出,就提取,output_summary:截断输出,只保留一部分,生成格式化的关键事实
output_summary = step.output[:self.config.output_summary_length] + ("..." if len(step.output) > self.config.output_summary_length else "")
fact_key = f"task:{step.task_id}:step:{step.step_id}"
self.key_facts[fact_key] = f"命令:{step.content}, 结果: {output_summary}"
# 提取分析结论,如果有分析结果且包含重要词,就保存
if step.analysis and "analysis" in step.analysis:
analysis = step.analysis["analysis"]
if "关键发现" in analysis or "重要" in analysis:
fact_key = f"analysis:{hash(analysis)}"
self.key_facts[fact_key] = analysis
async def _compress_memory(self, task_id: str) -> None:
"""压缩任务记忆,这是最复杂的方法,使用AI压缩记忆"""
if not self.llm_config or task_id not in self.task_steps: #必须有LLM配置和任务步骤
return
logger.info(f"开始压缩任务 {task_id} 的记忆...")
# 构建压缩提示,调用另一个方法构建提示
prompt = self._build_compression_prompt(task_id)
try:
# 调用LLM进行压缩,调用AI模型压缩记忆
compressed_data = await self._call_llm_compression(prompt)
# 创建压缩记忆对象,用AI返回的数据创建 CompressedMemory 对象,uuid.uuid4() 生成唯一ID
memory = CompressedMemory(
memory_id=str(uuid.uuid4()),
key_findings=compressed_data.get("key_findings", []),
failed_attempts=compressed_data.get("failed_attempts", []),
current_status=compressed_data.get("current_status", "未知状态"),
next_steps=compressed_data.get("next_steps", []),
source_steps=len(self.task_steps[task_id])
)
# 更新失败尝试记录,把压缩后的失败记录加入全局失败记录
for attempt in memory.failed_attempts:
self.failed_attempts[attempt] = self.failed_attempts.get(attempt, 0) + 1
#加入压缩记忆列表
self.compressed_memories.append(memory)
# 会话级别的压缩记忆管理,如果有会话,也加入会话的记忆
session_id = self.session_task_mapping.get(task_id)
if session_id and session_id in self.session_memories:
self.session_memories[session_id]["compressed_memories"].append(memory)
logger.info(f"[会话记忆] 会话 {session_id} 添加压缩记忆: {len(memory.key_findings)}个关键发现")
logger.info(f"记忆压缩成功: 添加了{len(memory.key_findings)}个关键发现")
except Exception as e:
logger.error(f"记忆压缩失败: {e}")
# 创建错误记忆
error_memory = CompressedMemory(
memory_id=str(uuid.uuid4()),
key_findings=[f"压缩失败: {str(e)}"],
failed_attempts=[],
current_status="压缩失败",
next_steps=["重新尝试压缩"],
source_steps=len(self.task_steps[task_id])
)
self.compressed_memories.append(error_memory)
# 清空历史记录,保留最后几步
keep_last = min(self.keep_last_steps, len(self.task_steps[task_id]))
self.task_steps[task_id] = self.task_steps[task_id][-keep_last:]
def _build_compression_prompt(self, task_id: str) -> str:
"""构建压缩提示"""
prompt = """
你是一个专业的任务执行助手,需要压缩任务执行历史记录。请执行以下任务:
1. 识别并提取关键的技术细节和发现
2. 标记已尝试但失败的解决方案
3. 总结当前任务状态和下一步建议
4. 以JSON格式返回以下结构的数据:
{
"key_findings": ["发现1", "发现2"],
"failed_attempts": ["命令1", "命令2"],
"current_status": "当前状态描述",
"next_steps": ["建议1", "建议2"]
}
任务ID: {task_id}
历史记录:
""".format(task_id=task_id)
# 添加关键事实
prompt += "关键事实摘要:\n"
#list(self.key_facts.items()):把字典转换成(键,值)的列表
#[-self.config.key_facts_compression_limit:]:取最近N个(配置决定)
#循环添加每个关键事实到提示中
recent_facts = list(self.key_facts.items())[-self.config.key_facts_compression_limit:] # 最近关键事实
for _, value in recent_facts:
prompt += f"- {value}\n"
# 添加历史步骤
steps = self.task_steps[task_id][-self.compression_threshold:]
for i, step in enumerate(steps): #获取最近N个步骤(压缩阈值决定数量),enumerate(steps):同时获取索引和步骤对象,添加步骤的基本信息
prompt += f"\n步骤 {i+1}:\n"
prompt += f"- 目的: {step.purpose}\n"
prompt += f"- 命令: {step.content}\n"
if step.output: #处理输出和结果,如果有输出,显示部分输出(截断到配置的长度),三元表达式:如果输出太长就加...,否则不加
prompt += f"- 输出: {step.output[:self.config.step_output_display_length]}{'...' if len(step.output) > self.config.step_output_display_length else ''}\n"
if step.analysis: #处理分析和失败,.get("analysis", "无分析"):安全获取,如果不存在返回"无分析",如果步骤失败,显示错误信息
analysis = step.analysis.get("analysis", "无分析")
prompt += f"- 分析: {analysis}\n"
if not step.success:
prompt += f"- 状态: 失败 - {step.error}\n"
return prompt
#调用LLM进行压缩的方法,这个方法实际调用AI模型来压缩记忆:
async def _call_llm_compression(self, prompt: str) -> Dict[str, Any]:
"""调用LLM进行记忆压缩"""
try: #导入litellm库
import litellm #litellm:一个统一的AI模型调用库
litellm.enable_json_schema_validation = True #enable_json_schema_validation:启用JSON格式验证
response = litellm.completion( #使用 litellm.completion 调用AI,参数包括:模型名、API密钥、API地址、消息、最大token数
model=self.llm_config["model"],
api_key=self.llm_config["api_key"],
api_base=self.llm_config["api_base"],
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
)
#response.choices[0].message.content:获取AI回复内容
json_str = response.choices[0].message.content.strip() #.strip():去掉首尾空格
return json.loads(json_str) #json.loads():把JSON字符串转换成Python字典
except Exception as e: #错误处理
logger.error(f"LLM压缩调用失败: {e}")
return { #如果出错,返回一个默认的错误结构,保证方法总能返回有效的字典
"key_findings": [f"压缩失败: {str(e)}"],
"failed_attempts": [],
"current_status": "压缩失败",
"next_steps": ["检查LLM配置"]
}
#并行执行调度方法
async def schedule_parallel_execution(self, tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""并行调度执行给定任务列表,返回结果列表"""
if not tasks: #空检查
return []
# 定义内部任务运行函数
async def _run_task(task: Dict[str, Any]) -> Dict[str, Any]: #定义异步函数来处理单个任务
task_id = task.get("id") or str(uuid.uuid4()) #如果没有提供任务ID,就生成一个
async with self._lock: # 注册任务
self.task_registry[task_id] = {
"id": task_id,
"type": task.get("type") or "processor",
"status": "running",
"params": task.get("params") or {},
"context": task.get("context"),
"created_at": time.time()
}
try:
# 创建任务步骤记录
step = TaskStep(
step_id=str(uuid.uuid4()),
task_id=task_id,
purpose=task.get("purpose", "执行任务"),
content=str(task.get("params", {})),
success=True
)
# 占位执行:此处可接入真实执行器
await asyncio.sleep(0) #await asyncio.sleep(0):让出控制权,不实际等待,这是一个占位,实际应该调用真正的任务执行器
result = {
"success": True,
"result": None,
"task_type": self.task_registry[task_id]["type"],
}
step.output = str(result)
# 记录步骤和结果
await self.add_task_step(task_id, step)
return result
#错误处理
except Exception as e:
# 记录失败的步骤
step = TaskStep(
step_id=str(uuid.uuid4()),
task_id=task_id,
purpose=task.get("purpose", "执行任务"),
content=str(task.get("params", {})),
success=False,
error=str(e)
)
await self.add_task_step(task_id, step)
return {"success": False, "error": str(e)}
# 最终状态更新
finally: #finally:无论成功失败都会执行,更新任务状态为完成
async with self._lock:
entry = self.task_registry.get(task_id)
if entry is not None:
entry["status"] = "completed"
entry["completed_at"] = time.time()
#并行执行所有任务,coros:协程对象列表(每个任务一个)
coros = [_run_task(t) for t in tasks]
#asyncio.gather:同时运行所有协程
return await asyncio.gather(*coros, return_exceptions=False) #return_exceptions=False:如果有异常就抛出,不返回异常
async def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
"""查询指定任务状态"""
async with self._lock:
return self.task_registry.get(task_id)
# 获取任务状态的方法
async def get_running_tasks(self) -> List[Dict[str, Any]]: #简单查询任务注册表,使用 .get() 方法安全获取
"""获取运行中任务列表"""
async with self._lock: #列表推导式:筛选出状态为"running"的任务,.values():获取所有任务字典
return [t for t in self.task_registry.values() if t.get("status") == "running"]
#获取任务记忆摘要的方法
#这个方法生成任务的详细报告:
async def get_task_memory_summary(self, task_id: str, include_key_facts: bool = True) -> str:
"""获取任务记忆摘要"""
async with self._lock:
summary = ""
# 1. 关键事实摘要
if include_key_facts and self.key_facts:
summary += "关键事实:\n"
task_facts = [v for k, v in self.key_facts.items() if f"task:{task_id}" in k]
for fact in task_facts[-self.config.key_facts_summary_limit:]: # 显示最近关键事实
summary += f"- {fact}\n"
summary += "\n"
#筛选出这个任务的关键事实
#if f"task:{task_id}" in k:检查键是否包含任务ID
# 2. 压缩记忆摘要
if self.compressed_memories:
summary += "压缩记忆块:\n"
for i, mem in enumerate(self.compressed_memories[-self.config.compressed_memory_summary_limit:]): # 显示最近压缩块
summary += f"记忆块 #{len(self.compressed_memories)-i}:\n" #len(self.compressed_memories)-i:计算块编号(从后往前)
summary += f"- 状态: {mem.current_status}\n"
#显示关键发现
summary += f"- 关键发现: {', '.join(mem.key_findings[:self.config.key_findings_display_limit])}" #', '.join():用逗号和空格连接列表元素
if len(mem.key_findings) > self.config.key_findings_display_limit: # 如果发现太多,显示"等X项"
summary += f" 等{len(mem.key_findings)}项"
summary += "\n"
if mem.failed_attempts:
summary += f"- 失败尝试: {', '.join(mem.failed_attempts[:self.config.failed_attempts_display_limit])}"
if len(mem.failed_attempts) > self.config.failed_attempts_display_limit:
summary += f" 等{len(mem.failed_attempts)}项"
summary += "\n"
if mem.next_steps:
summary += f"- 建议步骤: {mem.next_steps[0]}\n"
summary += f"- 来源: 基于{mem.source_steps}个历史步骤\n\n"
# 3. 最近详细步骤
if task_id in self.task_steps and self.task_steps[task_id]: #检查任务是否有步骤,逆序显示步骤(最新的在最上面)
summary += "最近详细步骤:\n"
for i, step in enumerate(self.task_steps[task_id]):
step_num = len(self.task_steps[task_id]) - i
summary += f"步骤 {step_num}:\n"
summary += f"- 目的: {step.purpose}\n"
summary += f"- 命令: {step.content}\n"
if step.output:
summary += f"- 输出: {step.output[:self.config.step_output_display_length]}{'...' if len(step.output) > self.config.step_output_display_length else ''}\n"
if step.analysis:
analysis = step.analysis.get("analysis", "无分析")
summary += f"- 分析: {analysis}\n"
if not step.success:
summary += f"- 状态: 失败 - {step.error}\n"
# 显示失败次数
if step.content in self.failed_attempts: #检查这个命令是否失败过,显示总共失败了多少次
summary += f"- 历史失败次数: {self.failed_attempts[step.content]}\n"
summary += "\n"
return summary if summary else "无历史记录"
async def get_global_memory_summary(self) -> str:
"""获取全局记忆摘要"""
async with self._lock:
summary = f"全局任务记忆摘要\n"
summary += f"=" * 50 + "\n"
# 任务统计
total_tasks = len(self.task_registry)
running_tasks = len([t for t in self.task_registry.values() if t.get("status") == "running"])
completed_tasks = len([t for t in self.task_registry.values() if t.get("status") == "completed"])
summary += f"任务统计:\n"
summary += f"- 总任务数: {total_tasks}\n"
summary += f"- 运行中: {running_tasks}\n"
summary += f"- 已完成: {completed_tasks}\n\n"
# 记忆统计
total_steps = sum(len(steps) for steps in self.task_steps.values()) #sum():计算所有任务的步骤总数
total_compressed = len(self.compressed_memories)
total_facts = len(self.key_facts)
total_failures = len(self.failed_attempts)
summary += f"记忆统计:\n"
summary += f"- 总步骤数: {total_steps}\n"
summary += f"- 压缩记忆块: {total_compressed}\n"
summary += f"- 关键事实: {total_facts}\n"
summary += f"- 失败尝试: {total_failures}\n\n"
# 最近活动
if self.compressed_memories:
summary += "最近压缩记忆:\n"
for mem in self.compressed_memories[-self.config.compressed_memory_global_limit:]: # 最近压缩记忆
summary += f"- {mem.current_status} (基于{mem.source_steps}步骤)\n"
if mem.key_findings:
summary += f" 关键发现: {mem.key_findings[0]}\n"
summary += "\n"
return summary
async def get_failed_attempts_summary(self) -> Dict[str, int]: #返回失败尝试的副本(字典)
"""获取失败尝试摘要"""
async with self._lock:
return self.failed_attempts.copy() #.copy():创建副本,避免外部修改原始数据
#返回格式:{"命令1": 失败次数, "命令2": 失败次数}
async def clear_task_memory(self, task_id: str) -> bool:
"""清除指定任务的记忆"""
async with self._lock:
if task_id in self.task_steps: #检查任务是否存在
del self.task_steps[task_id] #del self.task_steps[task_id]:删除该任务的步骤列表
logger.info(f"已清除任务 {task_id} 的记忆")
return True
return False #返回 True 表示成功,False 表示任务不存在
#只清除步骤,不清除其他记忆
async def clear_all_memory(self) -> None:
"""清除所有记忆"""
async with self._lock:
self.task_steps.clear() #.clear():清空整个字典/列表,清除所有7个数据存储容器
self.compressed_memories.clear()
self.key_facts.clear()
self.failed_attempts.clear()
# 清除会话级别的记忆
self.session_memories.clear()
self.session_task_mapping.clear()
self.analysis_session_mapping.clear()
logger.info("已清除所有记忆")
#相当于重置整个系统
# ============ 会话级别的记忆管理方法 ============
async def get_session_memory_summary(self, session_id: str) -> Dict[str, Any]:
"""获取会话记忆摘要"""
async with self._lock: #检查会话是否存在
if session_id not in self.session_memories:
return {"error": f"会话 {session_id} 不存在"}
session_memory = self.session_memories[session_id] #获取会话记忆
# 构建会话摘要
summary = { #包含会话的基本信息和统计
"session_id": session_id,
"created_at": session_memory["created_at"],
"last_activity": session_memory["last_activity"],
"tasks_count": len(session_memory["tasks"]),
"compressed_memories_count": len(session_memory["compressed_memories"]),
"key_facts_count": len(session_memory["key_facts"]),
"failed_attempts_count": len(session_memory["failed_attempts"]),
"tasks": session_memory["tasks"],
"recent_key_facts": list(session_memory["key_facts"].values())[-5:], # list(...)[-5:]:取最后5个值,最近5个关键事实
"recent_compressed_memories": [ # 最近压缩记忆
{
"memory_id": mem.memory_id,
"key_findings": mem.key_findings[:3], # 前3个关键发现,列表推导式生成最近3个压缩记忆的简化信息
"current_status": mem.current_status,
"source_steps": mem.source_steps
}
for mem in session_memory["compressed_memories"][-3:] # 最近3个压缩记忆
],
"failed_attempts": dict(list(session_memory["failed_attempts"].items())[-5:]) # dict(list(...)[-5:]):取最后5个键值对转回字典,最近5个失败尝试
}
return summary
async def get_session_compressed_memories(self, session_id: str) -> List[Dict[str, Any]]:
"""获取会话的压缩记忆"""
async with self._lock: #安全检查
if session_id not in self.session_memories:
return []
session_memory = self.session_memories[session_id] #返回所有压缩记忆的详细信息
return [
{
"memory_id": mem.memory_id,
"key_findings": mem.key_findings,
"failed_attempts": mem.failed_attempts,
"current_status": mem.current_status,
"next_steps": mem.next_steps,
"source_steps": mem.source_steps
}
for mem in session_memory["compressed_memories"]
]
#返回完整的压缩记忆信息
#注意:这里没有限制数量,返回所有
async def get_session_key_facts(self, session_id: str) -> Dict[str, str]:
"""获取会话的关键事实"""
async with self._lock:
if session_id not in self.session_memories:
return {}
return self.session_memories[session_id]["key_facts"].copy()\
#简单返回会话关键事实的副本,格式:{"事实键": "事实内容"}
async def get_session_failed_attempts(self, session_id: str) -> Dict[str, int]:
"""获取会话的失败尝试"""
async with self._lock:
if session_id not in self.session_memories:
return {}
return self.session_memories[session_id]["failed_attempts"].copy()
#返回格式:{"命令": 失败次数}
async def get_session_tasks(self, session_id: str) -> List[Dict[str, Any]]:
"""获取会话的所有任务"""
async with self._lock: # 安全检查
if session_id not in self.session_memories:
return []
tasks = []
for task_id in self.session_memories[session_id]["tasks"]: #遍历会话的所有任务
if task_id in self.task_registry: #检查每个任务是否还在注册表中
task_info = self.task_registry[task_id].copy() #添加步骤计数:len(self.task_steps.get(task_id, []))
task_info["steps_count"] = len(self.task_steps.get(task_id, [])) #.get(task_id, []):安全获取,如果不存在返回空列表
tasks.append(task_info)
return tasks
async def clear_session_memory(self, session_id: str) -> bool:
"""清除指定会话的记忆"""
async with self._lock:
if session_id not in self.session_memories:
return False
# 清除会话相关的任务
for task_id in self.session_memories[session_id]["tasks"]:
if task_id in self.task_steps:
del self.task_steps[task_id]
if task_id in self.task_registry:
del self.task_registry[task_id]
if task_id in self.session_task_mapping:
del self.session_task_mapping[task_id]
# 清除会话记忆
del self.session_memories[session_id]
# 清除分析会话映射
analysis_sessions_to_remove = [
analysis_id for analysis_id, orig_session_id in self.analysis_session_mapping.items()
if orig_session_id == session_id
]
for analysis_id in analysis_sessions_to_remove:
del self.analysis_session_mapping[analysis_id]
logger.info(f"已清除会话 {session_id} 的所有记忆")
return True
async def get_all_sessions(self) -> List[Dict[str, Any]]:
"""获取所有会话的摘要信息"""
async with self._lock:
sessions = []
for session_id, session_memory in self.session_memories.items():
sessions.append({
"session_id": session_id,
"created_at": session_memory["created_at"],
"last_activity": session_memory["last_activity"],
"tasks_count": len(session_memory["tasks"]),
"compressed_memories_count": len(session_memory["compressed_memories"]),
"key_facts_count": len(session_memory["key_facts"]),
"failed_attempts_count": len(session_memory["failed_attempts"])
})
# 按最后活动时间排序
sessions.sort(key=lambda x: x["last_activity"], reverse=True) #sessions.sort():原地排序
#key=lambda x: x["last_activity"]:按最后活动时间排序,reverse=True:降序(最新的在前),lambda:匿名函数,简单函数的一种写法
return sessions
#全局变量 _SCHEDULER
_SCHEDULER: Optional[_TaskScheduler] = None
'''
创建一个全局变量,类型是Optional[_TaskScheduler]
Optional 表示可以是 _TaskScheduler 或 None
初始化为 None(还没创建实例)
这是单例模式的设计:整个程序只有一个调度器实例
'''
def get_task_scheduler(config: Optional[Any] = None) -> _TaskScheduler:
"""获取全局任务调度器单例"""
global _SCHEDULER
if _SCHEDULER is None:
_SCHEDULER = _TaskScheduler(config)
return _SCHEDULER
# ========锁(Lock)的详细解释=========
'''
1.什么是并发问题?
想象一下这个场景:
你有两个朋友同时在一个Excel表格上填写数据
朋友A在B1单元格写了"100"
朋友B也在B1单元格写了"200"
最后B1单元格里的值是什么?可能是100,也可能是200,甚至可能是乱码
这就是并发冲突:多个操作同时修改同一份数据,结果不可预测。
2. 锁的类比
锁就像厕所的门锁:
一个人进去后,把门锁上
其他人想用厕所,只能在门外等
里面的人用完了,开门出来
下一个人才能进去
在代码中,锁保护共享资源(比如字典、列表):
self._lock = asyncio.Lock() # 创建一个锁,就像装了一把门锁
async with self._lock: # 1. 检查锁是否开着 2. 如果开着,进去并锁上门
# 在这里安全地修改共享数据
self.task_registry[task_id] = {...} # 只有我能改这个字典
# 3. 自动开门(释放锁)
3. 如果没有锁会发生什么?
看这个例子:
# 两个协程同时执行这个函数
async def add_task(self, task_id):
# 没有锁保护
if task_id not in self.task_registry:
# 假设执行到这里时,另一个协程也执行到了这里
# 两个协程都认为task_id不存在
self.task_registry[task_id] = {"status": "created"}
# 结果:同一个task_id被创建了两次!数据混乱!
4. 锁的实际工作流程
async def create_task(self, task_id, purpose):
async with self._lock: # ① 获取锁(如果锁被占用就等待)
# ② 进入临界区(只有持有锁的协程能执行这里)
self.task_registry[task_id] = {
"id": task_id,
"purpose": purpose,
"status": "created"
}
# ③ 操作完成
# ④ 自动释放锁,其他等待的协程可以继续
时间线示例:
时间点1: 协程A获得锁,开始修改数据
时间点2: 协程B尝试获取锁,发现锁被占用,进入等待状态
时间点3: 协程A完成操作,释放锁
时间点4: 协程B获得锁,开始执行
5. 为什么用 async with?
async with 是异步上下文管理器
它确保:
进入时自动获取锁
退出时自动释放锁
即使出现异常也会释放锁
相当于:
# async with self._lock: 等价于:
await self._lock.acquire() # 获取锁
try:
# 执行代码
finally:
self._lock.release() # 无论如何都释放锁
'''
# ========安全检查的详细解释==========
'''
1. 什么是安全检查?
安全检查就是在执行操作前先检查条件是否满足,防止出错。
2. 常见的需要安全检查的情况
情况1:访问不存在的键
# 错误的写法:
value = self.task_registry[task_id] # 如果task_id不存在,会抛出KeyError
# 正确的写法(安全检查):
if task_id in self.task_registry: # 先检查
value = self.task_registry[task_id] # 再访问
else:
value = None # 或者返回错误信息
情况2:除零错误
# 错误的写法:
result = a / b # 如果b=0,会抛出ZeroDivisionError
# 正确的写法(安全检查):
if b != 0: # 先检查
result = a / b # 再计算
else:
result = None
3. 代码中的具体例子
clear_task_memory 方法:
async def clear_task_memory(self, task_id: str) -> bool:
async with self._lock: # ① 先获取锁(保证安全访问)
if task_id in self.task_steps: # ② 安全检查:任务是否存在?
del self.task_steps[task_id] # ③ 安全操作
return True
return False # ④ 如果不存在,返回False
为什么需要安全检查?
如果有人试图删除一个不存在的任务
没有检查:del self.task_steps[task_id] 会抛出 KeyError
有检查:优雅地返回 False,告诉调用者"删除失败,因为任务不存在"
get_session_memory_summary 方法:
async def get_session_memory_summary(self, session_id: str) -> Dict[str, Any]:
async with self._lock:
if session_id not in self.session_memories: # 安全检查
return {"error": f"会话 {session_id} 不存在"} # 友好错误信息
# 安全检查通过,继续执行
session_memory = self.session_memories[session_id]
# ... 构建摘要
4. 安全检查的层次
第一层:存在性检查: if key in dictionary: # 键是否存在?
第二层:类型检查: if isinstance(value, dict): # 值是不是字典类型?
第三层:值域检查 if 0 <= index < len(list): # 索引是否在有效范围内?
第四层:业务逻辑检查: if user.has_permission("delete"): # 用户是否有删除权限?
5. 为什么要在锁内做安全检查?
这是一个非常重要的点!看这个例子:
# ❌ 错误的写法(有竞争条件):
async def wrong_method(self, task_id):
if task_id in self.task_steps: # ① 检查(没有锁保护)
# ② 这里可能有其他协程删除了task_id!
await asyncio.sleep(0.1) # 模拟耗时操作
del self.task_steps[task_id] # ③ 可能抛出KeyError!
# ✅ 正确的写法:
async def correct_method(self, task_id):
async with self._lock: # 获取锁
if task_id in self.task_steps: # 在锁的保护下检查
# 其他协程不能修改task_steps,所以这里安全
del self.task_steps[task_id]
关键点:从"检查"到"操作"这段时间,数据不能被别人修改。锁保证了这段时间的独占访问。
'''
# ========为什么我们会感觉安全检查与异常处理很像==========
#安全检查和异常处理确实有相似之处,但它们本质上是不同的编程思想。
'''
安全检查(主动防御)
# 主动:在错误发生前阻止它
if task_id in self.task_steps: # 主动检查
del self.task_steps[task_id] # 安全执行
else:
print("任务不存在") # 优雅处理
异常处理(被动响应)
# 被动:等错误发生后再处理
try:
del self.task_steps[task_id] # 可能出错
except KeyError: # 错误发生了
print("任务不存在") # 处理错误
'''
#例1:访问字典
'''
安全检查方式:
# 方式1:先检查再访问
if key in my_dict:
value = my_dict[key]
else:
value = None
# 方式2:使用get方法(也是一种安全检查)
value = my_dict.get(key, None) # 如果key不存在,返回None
异常处理方式:
try:
value = my_dict[key]
except KeyError:
value = None
'''
#例2:除零操作
'''
安全检查:
if denominator != 0: # 主动检查
result = numerator / denominator
else:
result = None
异常处理:
try:
result = numerator / denominator
except ZeroDivisionError:
result = None
'''
#为什么在并发代码中多用安全检查?
#1. 性能考虑
'''
异常处理有额外开销:
# Python中异常处理的代价
import time
def test_safety_check():
start = time.time()
for i in range(1000000):
if i < 1000000: # 总是为真
pass
return time.time() - start
def test_exception():
start = time.time()
for i in range(1000000):
try:
if i >= 1000000: # 永远不会为真
raise IndexError
except IndexError:
pass
return time.time() - start
print(f"安全检查: {test_safety_check():.4f}秒")
print(f"异常处理: {test_exception():.4f}秒")
# 异常处理通常慢2-10倍
'''
#2. 代码清晰性
'''
# 并发代码中,安全检查更清晰
async def process(self, task_id):
async with self._lock:
# 一系列安全检查
if not self._validate_task(task_id):
return {"error": "任务无效"}
if not self._has_permission():
return {"error": "无权限"}
if not self._check_resources():
return {"error": "资源不足"}
# 执行操作
return await self._execute(task_id)
#如果用异常处理:
async def process(self, task_id):
async with self._lock:
try:
self._validate_task(task_id) # 可能抛异常
self._check_permission() # 可能抛异常
self._check_resources() # 可能抛异常
return await self._execute(task_id)
except ValidationError:
return {"error": "任务无效"}
except PermissionError:
return {"error": "无权限"}
except ResourceError:
return {"error": "资源不足"}
'''
#3. 控制流更简单
'''
在并发代码中,异常可能导致锁的释放问题:
async def risky_method(self):
await self._lock.acquire() # 手动获取锁
try:
# 如果这里抛异常,可能无法正确释放锁
result = await self._do_something()
return result
except Exception as e:
# 处理异常
raise
finally:
self._lock.release() # 但finally块可能会执行
# 使用async with更安全,但它内部也是异常处理
async def safe_method(self):
async with self._lock: # 自动管理锁
return await self._do_something() # 任何异常都会正确释放锁
'''
#想象你在开车:
'''
安全检查就像:
出发前检查油量、轮胎、刹车
看到红灯就停下
限速牌前减速
你在主动避免事故
异常处理就像:
突然爆胎了 → 控制方向盘 → 慢慢靠边停车
刹车失灵了 → 换低速挡 → 拉手刹
发动机故障灯亮了 → 找最近的维修厂
你在被动应对事故
两者的关系:
好的司机会同时使用两种策略
出发前安全检查(预防)
但也要知道如何应对突发情况(处理)
'''
#并发编程中多用安全检查
#=====通用任务调度器 - 完整层级结构========
'''
通用任务调度器系统(内部视角)
├── 第1层:核心数据模型(Data Models)
│ ├── TaskStep(任务步骤)
│ │ ├── step_id: str # 步骤唯一标识
│ │ ├── task_id: str # 所属任务ID
│ │ ├── purpose: str # 步骤目的
│ │ ├── content: str # 步骤内容
│ │ ├── output: str # 步骤输出
│ │ ├── analysis: Dict # 分析结果
│ │ ├── timestamp: float # 时间戳
│ │ ├── success: bool # 是否成功
│ │ └── error: Optional[str] # 错误信息
│ │
│ └── CompressedMemory(压缩记忆)
│ ├── memory_id: str # 记忆唯一标识
│ ├── key_findings: List[str] # 关键发现
│ ├── failed_attempts: List[str] # 失败尝试
│ ├── current_status: str # 当前状态
│ ├── next_steps: List[str] # 建议步骤
│ ├── source_steps: int # 来源步骤数
│ └── timestamp: float # 时间戳
│
├── 第2层:核心调度引擎(_TaskScheduler)
│ ├── 初始化模块
│ │ ├── __init__(config) # 初始化配置和数据存储
│ │ └── set_llm_config(config) # 设置AI模型配置
│ │
│ ├── 任务生命周期管理
│ │ ├── create_task() # 创建新任务(入口点)
│ │ ├── add_task_step() # 添加任务步骤(核心记录)
│ │ ├── get_task_status() # 查询任务状态
│ │ ├── get_running_tasks() # 获取运行中任务
│ │ ├── schedule_parallel_execution() # 并行执行调度
│ │ ├── clear_task_memory() # 清除单个任务记忆
│ │ └── clear_all_memory() # 清除所有记忆
│ │
│ ├── 智能记忆压缩模块
│ │ ├── _extract_key_facts() # 提取关键事实(预处理)
│ │ ├── _compress_memory() # 压缩记忆(主流程)
│ │ ├── _build_compression_prompt() # 构建AI提示
│ │ └── _call_llm_compression() # 调用AI模型
│ │
│ ├── 查询与摘要模块
│ │ ├── get_task_memory_summary() # 获取任务记忆摘要
│ │ ├── get_global_memory_summary() # 获取全局记忆摘要
│ │ └── get_failed_attempts_summary() # 获取失败尝试摘要
│ │
│ └── 会话级别记忆管理(扩展层)
│ ├── get_session_memory_summary() # 获取会话记忆摘要
│ ├── get_session_compressed_memories() # 获取会话压缩记忆
│ ├── get_session_key_facts() # 获取会话关键事实
│ ├── get_session_failed_attempts() # 获取会话失败尝试
│ ├── get_session_tasks() # 获取会话所有任务
│ ├── clear_session_memory() # 清除会话记忆
│ └── get_all_sessions() # 获取所有会话
│
├── 第3层:数据存储结构
│ ├── 任务注册表(task_registry)
│ │ ├── 结构:Dict[str, Dict]
│ │ ├── 键:task_id
│ │ └── 值:任务元数据(id, purpose, status等)
│ │
│ ├── 任务步骤历史(task_steps)
│ │ ├── 结构:Dict[str, List[TaskStep]]
│ │ ├── 键:task_id
│ │ └── 值:该任务的所有步骤列表
│ │
│ ├── 压缩记忆库(compressed_memories)
│ │ ├── 结构:List[CompressedMemory]
│ │ └── 内容:所有压缩后的记忆块
│ │
│ ├── 关键事实存储(key_facts)
│ │ ├── 结构:Dict[str, str]
│ │ ├── 键:格式化的事实键
│ │ └── 值:事实内容
│ │
│ ├── 失败尝试计数(failed_attempts)
│ │ ├── 结构:Dict[str, int]
│ │ ├── 键:失败的操作内容
│ │ └── 值:失败次数
│ │
│ └── 会话级别存储(扩展)
│ ├── session_memories # 会话记忆:session_id -> 记忆数据
│ ├── session_task_mapping # 任务到会话的映射
│ └── analysis_session_mapping # 分析会话到原始会话的映射
│
├── 第4层:配置系统
│ ├── 来源:外部配置模块
│ ├── 配置参数:
│ │ ├── max_steps: int # 最大保存步骤数
│ │ ├── compression_threshold: int # 压缩触发阈值
│ │ ├── keep_last_steps: int # 压缩后保留步骤数
│ │ ├── output_summary_length: int # 输出摘要长度
│ │ ├── step_output_display_length: int # 步骤输出显示长度
│ │ ├── key_facts_compression_limit: int # 关键事实压缩限制
│ │ ├── key_facts_summary_limit: int # 关键事实摘要限制
│ │ ├── compressed_memory_summary_limit: int # 压缩记忆摘要限制
│ │ ├── key_findings_display_limit: int # 关键发现显示限制
│ │ ├── failed_attempts_display_limit: int # 失败尝试显示限制
│ │ └── compressed_memory_global_limit: int # 全局压缩记忆限制
│ │
│ └── LLM配置(可选):
│ ├── model: str # AI模型名称
│ ├── api_key: str # API密钥
│ └── api_base: str # API基础地址
│
├── 第5层:并发安全机制
│ ├── 锁机制(_lock: asyncio.Lock)
│ │ ├── 作用范围:所有修改共享数据的方法
│ │ ├── 使用方式:async with self._lock:
│ │ └── 保护的数据:
│ │ ├── task_registry
│ │ ├── task_steps
│ │ ├── compressed_memories
│ │ ├── key_facts
│ │ ├── failed_attempts
│ │ └── 所有会话相关数据
│ │
│ └── 安全检查策略
│ ├── 存在性检查:if key in dictionary
│ ├── 类型检查:isinstance()
│ ├── 边界检查:len()和索引范围
│ └── 业务逻辑检查:自定义验证
│
├── 第6层:日志与监控
│ ├── 日志记录器(logger)
│ ├── 关键日志点:
│ │ ├── 任务创建/完成
│ │ ├── 记忆压缩开始/结束
│ │ ├── AI调用成功/失败
│ │ ├── 清理操作
│ │ └── 错误和异常
│ │
│ └── 监控指标:
│ ├── 任务数量统计
│ ├── 步骤总数
│ ├── 压缩记忆块数
│ ├── 关键事实数
│ └── 失败尝试数
│
└── 第7层:外部依赖与集成
├── 异步运行时:asyncio
├── AI模型调用:litellm(或类似库)
├── 唯一标识生成:uuid
├── 时间处理:time, datetime
├── 数据序列化:json
└── 类型注解支持:typing
'''
# ========API接口映射(外部视角)=========
'''
API层(假设有Web封装)
├── 任务级别接口
│ ├── POST /tasks → create_task()
│ ├── GET /tasks → 获取所有任务(需要额外实现)
│ ├── GET /tasks/{task_id} → get_task_status()
│ ├── GET /tasks/{task_id}/memory → get_task_memory_summary()
│ ├── POST /tasks/{task_id}/steps → add_task_step()
│ └── DELETE /tasks/{task_id}/memory → clear_task_memory()
│
├── 并行执行接口
│ └── POST /tasks/parallel → schedule_parallel_execution()
│
├── 会话级别接口
│ ├── GET /sessions → get_all_sessions()
│ ├── GET /sessions/{session_id} → get_session_memory_summary()
│ ├── GET /sessions/{session_id}/compressed_memories → get_session_compressed_memories()
│ ├── GET /sessions/{session_id}/key_facts → get_session_key_facts()
│ ├── GET /sessions/{session_id}/failed_attempts → get_session_failed_attempts()
│ ├── GET /sessions/{session_id}/tasks → get_session_tasks()
│ └── DELETE /sessions/{session_id} → clear_session_memory()
│
└── 全局级别接口
├── GET /memory/global → get_global_memory_summary()
├── GET /memory/failed-attempts → get_failed_attempts_summary()
├── GET /memory/running-tasks → get_running_tasks()
└── DELETE /memory/global → clear_all_memory()
'''
# ========映射关系:从代码到API==========
# 让我展示它们之间的映射关系:
'''
代码方法 → RESTful接口
──────────────────
create_task(task_id, purpose) → POST /tasks
Body: {task_id, purpose}
get_task_status(task_id) → GET /tasks/{task_id}
add_task_step(task_id, step) → POST /tasks/{task_id}/steps
Body: {step数据}
get_task_memory_summary(task_id) → GET /tasks/{task_id}/memory
schedule_parallel_execution(tasks) → POST /tasks/parallel
Body: {任务列表}
get_all_sessions() → GET /sessions
get_session_memory_summary(session_id) → GET /sessions/{session_id}
clear_all_memory() → DELETE /memory/global
'''
toolkit_manager.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
#!/usr/bin/env python3 - 告诉电脑用python3来运行这个文件
# -*- coding: utf-8 -*- - 告诉电脑这个文件使用UTF-8编码(可以支持中文)
工具包管理器 - 管理agentserver的工具包
"""
import yaml
import logging
from pathlib import Path
from typing import Dict, Any, List, Optional
from .tools import FileEditToolkit, AsyncBaseToolkit, ToolkitConfig
'''
这些导入都是什么:
yaml - 用来读取YAML格式的配置文件(YAML是一种写配置文件的格式)
logging - 用来记录程序运行时的信息(比如错误、警告)
Path - 用来处理文件路径(比如找到某个文件夹)
typing 里的东西 - 只是用来告诉电脑数据类型,不会真的做什么
从当前文件夹的tools文件里导入三个东西:
FileEditToolkit - 文件编辑工具包
AsyncBaseToolkit - 所有工具包的基础类
ToolkitConfig - 工具包的配置
'''
#日志记录器
logger = logging.getLogger(__name__)
#这是什么:创建一个logger(记录器),用来写日志,__name__ 会自动变成这个文件的名字
class ToolkitManager:
"""工具包管理器"""
#1. 初始化方法 __init__
def __init__(self, config_dir: str = "agentserver/configs"):
self.config_dir = Path(config_dir)
self.toolkits: Dict[str, AsyncBaseToolkit] = {}
self.toolkit_configs: Dict[str, Dict[str, Any]] = {}
# 注册可用的工具包类型
self.toolkit_types = {
"file_edit": FileEditToolkit,
}
# 加载配置
self._load_configs()
'''
这个方法是做什么的:
当创建一个ToolkitManager对象时,这个方法会自动运行
它做了4件事:
1.设置配置文件夹路径
2.创建两个空字典(就像两个空盒子):
toolkits:放已经创建好的工具包
toolkit_configs:放从配置文件读取的配置信息
3.注册可用的工具包类型(现在只有一种:file_edit)
4.调用_load_configs()方法加载配置
'''
#加载配置方法
def _load_configs(self):
"""加载工具包配置"""
#1.检查文件夹是否存在
if not self.config_dir.exists(): #先检查配置文件夹是否存在
logger.warning(f"配置目录不存在: {self.config_dir}") #如果不存在,记录警告信息,然后直接返回(不继续执行)
return
#2.遍历所有YAML文件
for config_file in self.config_dir.glob("*.yaml"): #找到文件夹里所有以.yaml结尾的文件,对每个文件都做下面的操作
try:
#3.读取文件内容
with open(config_file, 'r', encoding='utf-8') as f: #打开文件('r'表示只读)
config = yaml.safe_load(f) #用yaml库读取内容,转成Python能理解的数据
#保存配置信息
toolkit_name = config.get('name', config_file.stem) #获取配置里的名字,如果没有就用文件名(去掉.yaml的部分)
self.toolkit_configs[toolkit_name] = config #把配置保存到toolkit_configs字典里
logger.info(f"加载工具包配置: {toolkit_name}")
#5.错误处理
except Exception as e:
logger.error(f"加载配置文件失败 {config_file}: {e}") #如果上面任何一步出错了,就记录错误信息,但程序不会崩溃
'''
这个代码文件创建了一个工具包管理器,它:从指定文件夹读取配置文件(YAML格式),管理不同类型的工具包,目前只支持一种工具包:文件编辑工具包
它里面有哪些函数:__init__ - 初始化函数(自动运行),_load_configs - 私有方法(内部使用),加载配置
用了哪些外部函数:Path.exists() - 检查路径是否存在,Path.glob() - 查找匹配的文件,open() - 打开文件,yaml.safe_load() - 安全地读取YAML文件,各种日志函数:warning(), info(), error()
'''
def get_toolkit(self, name: str) -> Optional[AsyncBaseToolkit]:
#这个方法是做什么的:根据名字获取一个工具包,如果工具包已经存在,直接返回,如果不存在,根据配置创建一个新的
"""获取工具包实例"""
#1.检查是否已经创建过
if name in self.toolkits: #先检查self.toolkits字典里有没有这个工具包
return self.toolkits[name] #如果有,直接返回(不再创建新的)
#检查配置是否存在
if name not in self.toolkit_configs: #如果配置里也没有这个工具包,记录错误并返回None
logger.error(f"工具包配置不存在: {name}")
return None
#3.获取配置信息
config_data = self.toolkit_configs[name] #从配置字典里取出这个工具包的配置
toolkit_type = config_data.get('mode', 'builtin') #获取工具包的模式(默认是'builtin',也就是内置的)
#4.创建内置工具包
if toolkit_type == 'builtin': #如果是内置模式,从toolkit_types字典里找到对应的类
toolkit_class = self.toolkit_types.get(name)
if not toolkit_class: #如果找不到,记录错误并返回
logger.error(f"未找到工具包类型: {name}")
return None
#5.创建配置对象
toolkit_config = ToolkitConfig( #创建一个ToolkitConfig配置对象
config=config_data.get('config', {}), #创建一个ToolkitConfig配置对象
name=name
)
#设置激活的工具列表
toolkit_config.activated_tools = config_data.get('activated_tools')
#6.创建工具包并保存
toolkit = toolkit_class(toolkit_config) #用配置对象创建工具包实例
self.toolkits[name] = toolkit #保存到self.toolkits字典里(下次就不用再创建了)
logger.info(f"创建工具包实例: {name}") #记录日志并返回
return toolkit
#处理其他模式
else:
logger.error(f"不支持的工具包模式: {toolkit_type}") #如果不是内置模式,记录错误并返回
return None
def get_all_tools(self) -> List[Dict[str, Any]]: #获取所有工具包中的所有工具
"""获取所有工具包的工具列表"""
all_tools = [] #创建一个空列表
for toolkit_name in self.toolkit_configs.keys(): #遍历所有工具包,遍历配置字典中的所有工具包名字
toolkit = self.get_toolkit(toolkit_name) #获取工具包实例,用刚才的get_toolkit方法获取工具包,如果获取成功,继续执行
if toolkit:
tools = toolkit.get_tools_list() #调用工具包的get_tools_list()方法获取工具列表
for tool in tools:
tool['toolkit'] = toolkit_name #给每个工具添加一个toolkit字段,记录它是哪个工具包的
all_tools.extend(tools) #把这些工具添加到总列表中
return all_tools #返回一个包含所有工具的列表
async def call_tool(self, toolkit_name: str, tool_name: str, arguments: Dict[str, Any]) -> str:
#这个方法做什么:调用一个具体的工具,这是一个异步方法(注意async和await)
"""调用工具(异步方法)"""
#1.获取工具包
toolkit = self.get_toolkit(toolkit_name) #先获取工具包实例
if not toolkit:
return f"工具包不存在: {toolkit_name}" #如果没有,返回错误信息
#尝试调用工具
try:
return await toolkit.call_tool(tool_name, arguments) #尝试异步调用工具包的call_tool方法,传入工具名字和参数
#3.错误处理
except Exception as e: #如果调用失败,记录错误日志
logger.error(f"调用工具失败 {toolkit_name}.{tool_name}: {e}")
return f"调用工具失败: {str(e)}" #返回错误信息
def list_toolkits(self) -> List[str]:
#这个方法做什么:列出所有可用的工具包名字,非常简单,就是把配置字典的所有键(key)转成列表
"""列出所有可用的工具包"""
return list(self.toolkit_configs.keys())
def get_toolkit_info(self, name: str) -> Optional[Dict[str, Any]]:
"""获取工具包详细信息"""
#1.检查配置是否存在
if name not in self.toolkit_configs:
return None #如果配置里没有这个工具包,返回None
#2.获取配置和工具包
config = self.toolkit_configs[name]
toolkit = self.get_toolkit(name)
#3.创建基本信息字典
info = {
"name": name,
"mode": config.get('mode', 'builtin'),
"config": config.get('config', {}),
"tools": []
}
#4.添加工具列表(如果有)
if toolkit:
info["tools"] = toolkit.get_tools_list()
#5.返回信息
return info
# 全局工具包管理器实例
toolkit_manager = ToolkitManager()
# =====完整的层级结构图=====
'''
工具包管理器 (ToolkitManager)
├── 初始化配置
│ ├── 设置配置目录路径
│ ├── 初始化存储容器
│ │ ├── 工具包实例字典 (toolkits)
│ │ └── 工具包配置字典 (toolkit_configs)
│ ├── 注册可用工具包类型
│ │ └── file_edit → FileEditToolkit
│ └── 自动加载配置 (_load_configs)
│ ├── 检查配置目录是否存在
│ ├── 遍历所有YAML配置文件
│ ├── 读取并解析YAML文件
│ └── 存储配置到字典
│
├── 工具包实例管理
│ ├── 获取工具包 (get_toolkit)
│ │ ├── 检查是否已实例化 (缓存检查)
│ │ ├── 检查配置是否存在
│ │ ├── 创建工具包流程
│ │ │ ├── 获取配置数据
│ │ │ ├── 确定工具包类型
│ │ │ ├── 创建配置对象 (ToolkitConfig)
│ │ │ │ ├── 设置基础配置
│ │ │ │ └── 设置激活工具列表
│ │ │ ├── 实例化工具包类
│ │ │ └── 缓存实例供下次使用
│ │ └── 错误处理
│ │ ├── 配置不存在错误
│ │ ├── 类型未找到错误
│ │ └── 模式不支持错误
│ │
│ ├── 列出所有工具包 (list_toolkits)
│ │ └── 返回配置字典的所有键
│ │
│ └── 获取工具包信息 (get_toolkit_info)
│ ├── 检查配置存在性
│ ├── 构建信息字典
│ │ ├── 基本信息
│ │ │ ├── 名称 (name)
│ │ │ ├── 模式 (mode)
│ │ │ └── 配置 (config)
│ │ └── 工具列表
│ │ └── 从工具包实例获取工具列表
│ └── 返回完整信息
│
├── 工具管理
│ ├── 获取所有工具 (get_all_tools)
│ │ ├── 遍历所有工具包配置
│ │ ├── 获取每个工具包的工具列表
│ │ ├── 标记工具所属工具包
│ │ │ └── 添加 toolkit 字段
│ │ └── 合并所有工具到列表
│ │
│ └── 调用工具 (call_tool) [异步]
│ ├── 获取工具包实例
│ ├── 调用具体工具
│ │ ├── 异步调用工具包方法
│ │ │ └── await toolkit.call_tool()
│ │ └── 传入参数
│ │ ├── 工具名称 (tool_name)
│ │ └── 参数字典 (arguments)
│ └── 错误处理
│ ├── 工具包不存在错误
│ └── 调用失败错误
│
├── 外部依赖
│ ├── 文件操作
│ │ ├── Path (路径处理)
│ │ │ ├── exists() - 检查存在
│ │ │ └── glob() - 模式匹配文件
│ │ └── open() - 文件读取
│ │
│ ├── 配置解析
│ │ └── yaml.safe_load() - YAML解析
│ │
│ ├── 日志系统
│ │ ├── logger.info() - 信息记录
│ │ ├── logger.warning() - 警告记录
│ │ └── logger.error() - 错误记录
│ │
│ └── 类型系统
│ ├── Dict - 字典类型
│ ├── List - 列表类型
│ ├── Any - 任意类型
│ └── Optional - 可选类型
│
└── 全局访问
└── 全局管理器实例 (toolkit_manager)
└── 单例模式,供其他模块导入使用
'''
config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Agent Server配置文件 - 重构版
提供客观、实用的配置管理
"""
from dataclasses import dataclass #dataclass:一个方便的工具,可以快速创建数据类
from typing import Optional #typing.Optional:一个类型提示工具(这里实际没用到)
# ============ 服务器配置 ============
# 从主配置读取端口
try: #try-except:逻辑判断,意思是“试试看,不行就用备选方案”
from system.config import get_server_port #先尝试从其他地方获取端口号
AGENT_SERVER_PORT = get_server_port("agent_server")
except ImportError:
AGENT_SERVER_PORT = 8001 # 回退默认值 #如果找不到(ImportError),就用默认的 8001
# ============ 任务调度器配置 ============
@dataclass
class TaskSchedulerConfig: #创建了一个叫“任务调度器配置”的设置类
"""任务调度器配置"""
# 记忆管理阈值
max_steps: int = 15 # 最大保存步骤数
compression_threshold: int = 7 # 压缩触发阈值
keep_last_steps: int = 4 # 压缩后保留的详细步骤数
# 显示相关阈值
key_facts_compression_limit: int = 5 # 压缩提示中的关键事实数量
key_facts_summary_limit: int = 10 # 摘要中的关键事实数量
compressed_memory_summary_limit: int = 3 # 任务摘要中的压缩记忆数量
compressed_memory_global_limit: int = 2 # 全局摘要中的压缩记忆数量
key_findings_display_limit: int = 3 # 关键发现显示数量
failed_attempts_display_limit: int = 3 # 失败尝试显示数量
# 输出长度限制
output_summary_length: int = 256 # 关键事实中的输出摘要长度
step_output_display_length: int = 512 # 步骤显示中的输出长度
# 性能配置
enable_auto_compression: bool = True # 是否启用自动压缩
compression_timeout: int = 30 # 压缩超时时间(秒)
max_compression_retries: int = 3 # 最大压缩重试次数
# 默认任务调度器配置实例
DEFAULT_TASK_SCHEDULER_CONFIG = TaskSchedulerConfig()
#用上面的配置类创建了一个实际可用的配置对象,就像用模具做了一个具体的蛋糕
# ============ Agent管理器配置 ============
@dataclass
class AgentManagerConfig: #AgentManagerConfig:管理代理(agent)的设置
"""Agent管理器配置"""
# 会话管理
default_session_timeout: int = 3600 # 默认会话超时时间(秒)
max_session_history: int = 100 # 最大会话历史记录数
# 任务执行
max_concurrent_agents: int = 5 # 最大并发Agent数
agent_timeout: int = 300 # Agent执行超时时间(秒)
# 缓存配置
enable_agent_cache: bool = True # 是否启用Agent缓存
cache_ttl: int = 1800 # 缓存生存时间(秒)
# 默认Agent管理器配置实例
DEFAULT_AGENT_MANAGER_CONFIG = AgentManagerConfig()
# ============ 电脑控制Agent配置 ============
@dataclass
class ComputerControlConfig: #ComputerControlConfig:控制电脑的设置
"""电脑控制Agent配置"""
# 执行配置
max_execution_time: int = 60 # 最大执行时间(秒)
enable_screenshot: bool = True # 是否启用截图功能
screenshot_quality: int = 80 # 截图质量(1-100)
# 安全配置
enable_safety_checks: bool = True # 是否启用安全检查
blocked_commands: list = None # 被阻止的命令列表
def __post_init__(self):
if self.blocked_commands is None:
self.blocked_commands = [
"rm -rf /", "format c:", "del /f /s /q c:\\",
"shutdown", "reboot", "halt"
]
# 默认电脑控制配置实例
DEFAULT_COMPUTER_CONTROL_CONFIG = ComputerControlConfig()
# ============ 全局配置管理 ============
@dataclass
class AgentServerConfig: #这是总的配置,包含所有子配置
"""Agent服务器全局配置"""
# 服务器配置
host: str = "0.0.0.0"
port: int = None
# 子模块配置
task_scheduler: TaskSchedulerConfig = None
agent_manager: AgentManagerConfig = None
computer_control: ComputerControlConfig = None
# 日志配置
log_level: str = "INFO"
enable_debug_logs: bool = False
def __post_init__(self): #这是一个特殊函数,会在类创建后自动运行
# 设置默认端口
if self.port is None:
self.port = AGENT_SERVER_PORT
# 设置默认子配置
if self.task_scheduler is None:
self.task_scheduler = DEFAULT_TASK_SCHEDULER_CONFIG
if self.agent_manager is None:
self.agent_manager = DEFAULT_AGENT_MANAGER_CONFIG
if self.computer_control is None:
self.computer_control = DEFAULT_COMPUTER_CONTROL_CONFIG
# 全局配置实例
config = AgentServerConfig()
# ============ 配置访问函数 ============
def get_task_scheduler_config() -> TaskSchedulerConfig:
"""获取任务调度器配置"""
return config.task_scheduler
def get_agent_manager_config() -> AgentManagerConfig:
"""获取Agent管理器配置"""
return config.agent_manager
def get_computer_control_config() -> ComputerControlConfig:
"""获取电脑控制配置"""
return config.computer_control
def update_config(**kwargs):
"""更新配置"""
for key, value in kwargs.items():
if hasattr(config, key):
setattr(config, key, value)
else:
raise ValueError(f"未知配置项: {key}")
# ============ 向后兼容 ============
# 保持向后兼容的配置常量
AGENT_SERVER_HOST = config.host
AGENT_SERVER_PORT = config.port
'''
代码最后创建了几个函数:
get_task_scheduler_config():获取任务调度器的配置
get_agent_manager_config():获取代理管理器的配置
get_computer_control_config():获取电脑控制的配置
update_config(**kwargs):更新配置的函数,可以修改设置
'''
README.md
一、 二次开发核心入口
1. 添加新的意图处理逻辑
- 修改文件:
task_planner.py - 关键函数:
evaluate_and_plan() - 添加新任务类型:
# 在task_planner.py中添加新任务类型判断
if "你的新意图关键词" in user_query:
return {
"type": "your_new_processor",
"params": {...},
"executor": "agent" # 或 "mcp"
}
2. 扩展任务执行器
- 修改文件:
task_scheduler.py - 添加新的processor:
async def _execute_task(self, task):
if task["type"] == "your_new_processor":
return await self._execute_your_processor(task["params"])
async def _execute_your_processor(self, params):
# 实现你的业务逻辑
pass
3. 自定义API接口
- 修改文件:
agent_server.py - 添加新端点:
@app.post("/your_custom_endpoint")
async def custom_handler(request_data: dict):
# 直接调用任务规划器
plan = await task_planner.evaluate_and_plan(request_data["query"])
# 调度执行
result = await task_scheduler.schedule_execution(plan)
return result
二、 单独调试步骤
1. 启动独立测试服务
# 进入项目目录
cd agentserver
# 方式1:直接运行(查看详细日志)
python -m uvicorn agent_server:app --reload --port 8001 --log-level debug
# 方式2:使用提供的脚本(如果有)
python dev_server.py
2. 测试API接口
# 测试意图分析(使用curl)
curl -X POST http://localhost:8001/analyze_and_execute \
-H "Content-Type: application/json" \
-d '{
"messages": [{"role": "user", "content": "帮我打开计算器"}],
"session_id": "test_session"
}'
# 查看任务状态
curl http://localhost:8001/tasks?session_id=test_session
3. 单元调试模块
# 创建测试脚本 test_debug.py
import asyncio
from task_planner import TaskPlanner
from task_scheduler import TaskScheduler
async def test_planner():
planner = TaskPlanner()
# 直接测试规划器
plan = await planner.evaluate_and_plan("打开记事本和浏览器")
print("生成的计划:", plan)
async def test_scheduler():
scheduler = TaskScheduler()
# 测试直接调度
task = {"type": "processor", "params": {"query": "测试"}}
result = await scheduler.schedule_execution(task)
print("执行结果:", result)
if __name__ == "__main__":
asyncio.run(test_planner())
4. 修改配置快速生效
- 配置文件:
config.py - 常用修改项:
# 关闭某些功能进行调试
INTENT_ANALYSIS_ENABLED = False # 跳过意图分析直接测试
TASK_TIMEOUT = 30 # 调小超时时间快速失败
MAX_CONCURRENT_TASKS = 1 # 限制并发数便于调试
三、 核心开发场景
场景1:添加新的电脑控制能力
- 在
task_scheduler.py中添加执行函数 - 在
task_planner.py中注册意图映射 - 测试:直接调用新函数验证功能
场景2:修改任务调度策略
# 修改调度逻辑(task_scheduler.py)
class CustomTaskScheduler(TaskScheduler):
async def schedule_parallel_execution(self, tasks):
# 重写并发策略
# 例如:按优先级排序、依赖关系处理
pass
场景3:集成新的AI模型
# 替换或扩展意图分析模型
# 在task_planner.py中修改
async def analyze_intent(self, query):
# 使用新的LLM API
response = await call_your_llm_api(query)
return self._parse_to_task(response)
四、 调试技巧
1. 日志追踪
# 在代码中添加调试日志
import logging
logger = logging.getLogger(__name__)
logger.debug(f"任务参数: {params}") # 开发时用debug级别
logger.info(f"开始执行任务: {task_id}")
2. 断点调试
# 使用debugpy进行远程调试
python -m debugpy --listen 0.0.0.0:5678 --wait-for-client agent_server.py
3. 测试数据模拟
# 创建模拟请求进行测试
test_data = {
"messages": [{"role": "user", "content": "测试指令"}],
"session_id": "debug_" + str(uuid.uuid4())
}
# 直接调用内部函数,绕过HTTP层
result = await app.state.planner.evaluate_and_plan(test_data["messages"][0]["content"])
五、 快速验证清单
- 修改生效了吗?
- 重启服务:
Ctrl+C然后重新启动 - 检查日志:确认没有导入错误
- 重启服务:
- 接口工作正常吗?
- # 健康检查:curl http://localhost:8001/health
- 任务执行成功吗?
- 查看任务状态接口
- 检查任务注册表:
task_scheduler.task_registry
- 需要回滚吗?
- 备份原文件:
cp agent_server.py agent_server.py.bak - 使用Git临时提交:
git stash
- 备份原文件:
六、 重要提醒
- 热重载:启动时加
--reload参数,修改代码自动重启 - 端口冲突:确保8001端口未被占用,或修改
config.py中的端口 - 依赖安装:如果添加新依赖,更新
requirements.txt - 并发安全:修改调度器时注意线程/协程安全问题
最简开发循环:
- 修改
task_planner.py或task_scheduler.py - 重启服务:
python -m uvicorn agent_server:app --reload --port 8001 - 发送测试请求:
curl -X POST ... - 查看日志确认结果
直接改这几个核心文件,就能实现大部分二次开发需求。



