本文最后更新于62 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com
file_edit_toolkit.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件编辑工具包 - 适配agentserver架构
提供文件读写、编辑功能,支持SEARCH/REPLACE格式的diff操作
"""
import re # 正则表达式,用来搜索和替换文字
import shutil # 文件操作工具,用来复制文件
import logging # 记录日志(就像写日记,记录发生了什么)
from datetime import datetime # 获取时间
from pathlib import Path # 处理文件路径
#前5行是导入Python自带的工具
from .base_toolkit import AsyncBaseToolkit, register_tool, ToolkitConfig
#最后一行是导入自己写的其他代码(从当前文件夹的base_toolkit.py导入)
logger = logging.getLogger(__name__)
#创建一个叫FileEditToolkit的类
#它继承自AsyncBaseToolkit(意思是它拥有父类的所有功能)
class FileEditToolkit(AsyncBaseToolkit):
"""文件编辑工具包"""
def __init__(self, config: ToolkitConfig = None) -> None: #它继承自AsyncBaseToolkit(意思是它拥有父类的所有功能)
super().__init__(config)
workspace_root = self.config.config.get("workspace_root", "/tmp/")
self.setup_workspace(workspace_root)
self.default_encoding = self.config.config.get("default_encoding", "utf-8")
self.backup_enabled = self.config.config.get("backup_enabled", False)
logger.info(
f"FileEditToolkit初始化完成 - 工作目录: {self.work_dir}, 编码: {self.default_encoding}"
)
'''
做的事情:
先调用父类的设置方法
设置工作目录(文件存放的地方)
设置文件编码(默认用UTF-8,可以显示中文)
设置是否备份文件
记录一条日志说初始化完成了
'''
def setup_workspace(self, workspace_root: str): #创建一个文件夹作为工作空间
"""设置工作空间"""
self.work_dir = Path(workspace_root).resolve() #把传入的路径变成Path对象
self.work_dir.mkdir(parents=True, exist_ok=True) #创建这个文件夹(如果不存在的话)
def _sanitize_filename(self, filename: str) -> str: #把文件名中的特殊字符(除了字母、数字、下划线、横杠、点)都变成下划线
"""清理文件名,移除不安全字符"""
safe = re.sub(r"[^\w\-.]", "_", filename) #例子:test&file.txt → test_file.txt
return safe
def _resolve_filepath(self, file_path: str) -> Path:
"""解析文件路径,处理相对路径和安全检查"""
path_obj = Path(file_path)
if not path_obj.is_absolute(): #如果是相对路径,就加上工作目录变成绝对路径
path_obj = self.work_dir / path_obj
sanitized_filename = self._sanitize_filename(path_obj.name) #清理文件名
path_obj = path_obj.parent / sanitized_filename
resolved_path = path_obj.resolve()
self._create_backup(resolved_path) #如果需要,创建备份
return resolved_path #返回处理后的路径
def _create_backup(self, file_path: Path) -> None:
"""创建文件备份""" #在修改文件前先备份
if not self.backup_enabled or not file_path.exists(): #只有当backup_enabled为True且文件存在时才备份
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = file_path.parent / f"{file_path.name}.{timestamp}.bak" #份文件命名:原文件名 + 时间 + .bak
shutil.copy2(file_path, backup_path)
logger.info(f"创建备份文件: {backup_path}")
@register_tool
async def edit_file(self, path: str, diff: str) -> str:
"""编辑文件,使用SEARCH/REPLACE格式的diff
参数:
path (str): 要编辑的文件路径
diff (str): SEARCH/REPLACE格式的diff内容,格式如下:
```
<<<<<<< SEARCH
[要查找的确切内容]
=======
[要替换的新内容]
>>>>>>> REPLACE
```
"""
try:
resolved_path = self._resolve_filepath(path)
with open(resolved_path, encoding=self.default_encoding) as f:
content = f.read()
modified_content = content
pattern = r"<<<<<<< SEARCH\n(.*?)\n=======\n(.*?)\n>>>>>>> REPLACE"
matches = re.findall(pattern, diff, re.DOTALL)
if not matches:
return "错误!在提供的diff中未找到有效的SEARCH/REPLACE块"
# 应用每个搜索/替换对
for search_text, replace_text in matches:
if search_text in modified_content:
modified_content = modified_content.replace(search_text, replace_text)
else:
logger.warning(f"在文件中未找到搜索文本: {search_text[:50]}...")
with open(resolved_path, "w", encoding=self.default_encoding) as f:
f.write(modified_content)
return f"成功编辑文件: {resolved_path}"
except Exception as e:
logger.error(f"编辑文件失败: {e}")
return f"编辑文件失败: {str(e)}"
@register_tool
async def write_file(self, path: str, file_text: str) -> str:
"""写入文本内容到文件
参数:
path (str): 要写入的文件路径
file_text (str): 要写入的完整文本内容
"""
try:
path_obj = self._resolve_filepath(path)
path_obj.parent.mkdir(parents=True, exist_ok=True)
path_obj.write_text(file_text, encoding=self.default_encoding)
return f"成功写入文件: {path_obj}"
except Exception as e:
logger.error(f"写入文件失败: {e}")
return f"写入文件失败: {str(e)}"
@register_tool
async def read_file(self, path: str) -> str:
"""读取并返回文件内容
参数:
path (str): 要读取的文件路径
"""
try:
path_obj = self._resolve_filepath(path)
if not path_obj.exists():
return f"文件不存在: {path_obj}"
return path_obj.read_text(encoding=self.default_encoding)
except Exception as e:
logger.error(f"读取文件失败: {e}")
return f"读取文件失败: {str(e)}"
@register_tool
async def list_files(self, directory: str = ".") -> str:
"""列出目录中的文件
参数:
directory (str): 要列出的目录路径,默认为当前目录
"""
try:
if directory == ".":
target_dir = self.work_dir
else:
target_dir = self.work_dir / directory
if not target_dir.exists():
return f"目录不存在: {target_dir}"
files = []
for item in target_dir.iterdir():
if item.is_file():
files.append(f"文件: {item.name}")
elif item.is_dir():
files.append(f"目录: {item.name}/")
if not files:
return f"目录为空: {target_dir}"
return f"目录 {target_dir} 内容:\n" + "\n".join(files)
except Exception as e:
logger.error(f"列出文件失败: {e}")
return f"列出文件失败: {str(e)}"
@register_tool
async def create_directory(self, path: str) -> str:
"""创建目录
参数:
path (str): 要创建的目录路径
"""
try:
path_obj = self.work_dir / path
path_obj.mkdir(parents=True, exist_ok=True)
return f"成功创建目录: {path_obj}"
except Exception as e:
logger.error(f"创建目录失败: {e}")
return f"创建目录失败: {str(e)}"
@register_tool
async def delete_file(self, path: str) -> str:
"""删除文件
参数:
path (str): 要删除的文件路径
"""
try:
path_obj = self._resolve_filepath(path)
if not path_obj.exists():
return f"文件不存在: {path_obj}"
path_obj.unlink()
return f"成功删除文件: {path_obj}"
except Exception as e:
logger.error(f"删除文件失败: {e}")
return f"删除文件失败: {str(e)}"
base_toolkit.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基础工具包类 - 适配agentserver架构
"""
import logging
from collections.abc import Callable #Callable:表示"可以被调用的东西",比如函数
from typing import Any, Dict, Optional
'''
typing模块:用来做类型提示的,告诉Python每个变量应该是什么类型
Any:可以是任何类型
Dict:字典类型
Optional:可选类型(可能有值,可能没有)
'''
logger = logging.getLogger(__name__)
#这是一个用来存储配置信息的类。
class ToolkitConfig:
"""工具包配置类"""
def __init__(self, config: Dict[str, Any] = None, name: str = None):
self.config = config or {}
self.name = name or self.__class__.__name__
self.mode = "builtin"
self.activated_tools = None
'''
这个类有4个属性(可以理解为4个盒子):
1.self.config - 配置字典
如果传入了配置,就用传入的
如果没有传入,就用空字典{}
2.self.name - 名字
如果传入了名字,就用传入的
如果没有,就用类的名字("ToolkitConfig")
3.self.mode - 模式
固定为"builtin"(内置模式)
4.self.activated_tools - 激活的工具
初始为None(空)
用来决定哪些工具可以用
'''
class AsyncBaseToolkit:
"""工具包基类 - 适配agentserver架构"""
def __init__(self, config: ToolkitConfig | dict | None = None):
# 看看传入的config是不是ToolkitConfig类型,如果不是:就创建一个新的ToolkitConfig对象,如果是:就直接使用
if not isinstance(config, ToolkitConfig):
config = config or {}
config = ToolkitConfig(config=config, name=self.__class__.__name__)
self.config: ToolkitConfig = config #self.config:存储配置对象
self._tools_map: dict[str, Callable] = None #self._tools_map:存储工具映射(初始为None)
@property
# 这个属性会自动收集所有被@register_tool装饰过的方法
def tools_map(self) -> dict[str, Callable]: #返回类型:dict[str, Callable](字典,键是字符串,值是可调用对象)
#注意:因为有 @property,所以实际上是个"属性"
"""懒加载工具映射 - 收集通过@register_tool注册的工具"""
if self._tools_map is None #检查 self._tools_map 是否为 None.如果已经是字典,说明已经收集过了,不用再收集
self._tools_map = {} #如果还没收集过:创建一个空字典,self._tools_map 是私有属性,存储工具映射
# 遍历类方法,找到带有@register_tool装饰器的方法
for attr_name in dir(self): #dir(self):获取对象所有属性和方法的名称列表,循环:遍历每个属性和方法的名称
#例子:如果对象有 edit_file, read_file 方法,dir(self) 会返回包含这些名字的列表
attr = getattr(self, attr_name) #getattr(self, attr_name):根据名称获取实际的属性或方法.如果 attr_name 是 "edit_file",那么 attr 就是 edit_file 方法本身
#这是关键判断条件,分两部分:
if callable(attr) and getattr(attr, "_is_tool", False):
'''
第一部分:callable(attr)
检查 attr 是否"可调用"
什么是"可调用"?就是可以加括号执行的东西
函数 √ 可调用
方法 √ 可调用
字符串 × 不可调用
整数 × 不可调用
第二部分:getattr(attr, "_is_tool", False)
getattr(attr, "_is_tool", False):尝试获取 attr 的 _is_tool 属性
三个参数:
attr:要检查的对象
"_is_tool":要获取的属性名
False:如果属性不存在,返回 False
作用:检查这个函数是否被 @register_tool 装饰过
组合起来:
如果是可调用的(是函数/方法)
并且有 _is_tool 属性(值为 True)
那么说明这是一个工具方法
'''
self._tools_map[attr._tool_name] = attr #如果通过了上面的检查:把工具名作为键:attr._tool_name,把工具函数作为值:attr,存入字典 self._tools_map
return self._tools_map
'''
如何工作?
1.第一次访问tools_map时,self._tools_map是None
2然后它开始搜索:
遍历这个类的所有属性和方法
检查每个属性是否"可调用"(是函数)并且有_is_tool标记
如果有,就把它添加到工具映射中
'''
#定义一个方法,返回字典(工具名 -> 工具函数)
def get_tools_map_func(self) -> dict[str, Callable]:
"""获取工具映射,根据配置过滤激活的工具"""
if self.config.activated_tools: #检查配置中是否有 activated_tools,如果有,说明需要过滤(只返回指定的工具),如果没有,返回所有工具
#这是最难理解的部分,我们拆开看:all():检查所有条件是否都为 True
#里面的表达式:tool_name in self.tools_map for tool_name in self.config.activated_tools,意思:对于配置中的每个工具名,检查它是否在 tools_map 中存在
#返回值:如果所有工具名都存在,返回True;否则返回False
'''assert 语句:
assert 条件, "错误信息"
作用:如果条件为 False,就报错,显示错误信息\这里:如果配置的工具名有不存在于实际工具中的,就报错
'''
assert all(tool_name in self.tools_map for tool_name in self.config.activated_tools), (
f"错误配置激活工具: {self.config.activated_tools}! 可用工具: {self.tools_map.keys()}" #格式化字符串,显示:配置了哪些工具,实际有哪些工具可用
)
tools_map = {tool_name: self.tools_map[tool_name] for tool_name in self.config.activated_tools}#快速创建字典的方法
'''
拆解:
# 伪代码理解:
tools_map = {}
for tool_name in self.config.activated_tools:
tools_map[tool_name] = self.tools_map[tool_name]
'''
#结果:创建一个新字典,只包含配置中指定的工具
else: #否则(如果没有配置 activated_tools),返回所有工具
tools_map = self.tools_map
return tools_map
'''
功能解释:
获取工具映射
如果配置了activated_tools(激活的工具列表),就只返回这些工具
如果没有配置,就返回所有工具
'''
def get_tools_list(self) -> list[Dict[str, Any]]: #定义一个方法,返回列表,列表中的每个元素是字典
"""获取工具列表 - 适配agentserver格式"""
tools_map = self.get_tools_map_func() #调用刚才的方法,获取工具映射(可能已过滤)
tools = [] #创建一个空列表,准备存储工具信息
for tool_name, tool_func in tools_map.items(): #遍历工具字典的每一项
#每次循环得到:tool_name:工具名(字符串),tool_func:工具函数(可调用对象)
tools.append({ #这是向列表添加一个字典:字典包含三个键:
"name": tool_name,
"function": tool_func,
"description": tool_func.__doc__ or f"工具: {tool_name}" #tool_func.__doc__ 获取函数的文档字符串
#or 运算符:如果左边为真(有文档),就用左边的,如果左边为假(空字符串或None),就用右边的
})
return tools
'''
功能解释:
把工具转换成列表格式,每个工具包含:
1.名字(name)
2.函数(function)
3.描述(description)
用函数的文档字符串(__doc__)
如果没有文档,就显示"工具: 工具名"
为什么需要这个?
有些系统需要工具以特定格式提供
这个格式让其他系统知道每个工具是干什么的
'''
async def call_tool(self, name: str, arguments: dict) -> str:
"""通过名称调用工具"""
tools_map = self.get_tools_map_func() #获取工具映射
if name not in tools_map:
raise ValueError(f"工具 {name} 未找到") #如果不存在,抛出错误
tool = tools_map[name]
#这是关键调用部分:
return await tool(**arguments) #**arguments:** 是"解包"运算符,把字典解包成关键字参数
'''
工作步骤:
获取工具映射
检查要调用的工具是否存在
如果不存在,报错
如果存在,调用这个工具
把参数传给工具
返回结果
'''
#这是一个装饰器,用来标记哪些方法是工具
def register_tool(name: str = None):
'''
使用方法:
1.最简单的方式:@register_tool
使用函数本身的名字作为工具名
2.使用括号:@register_tool()
同上,使用函数名
3.自定义名字:@register_tool("custom_name")
使用自定义名字作为工具名
'''
"""装饰器:将方法注册为工具
用法:
@register_tool # 使用方法名
@register_tool() # 使用方法名
@register_tool("custom_name") # 使用自定义名称
参数:
name (str, optional): 工具名称
"""
def decorator(func: Callable):
if isinstance(name, str):
tool_name = name
else:
tool_name = func.__name__
func._is_tool = True # 标记这个函数是工具
func._tool_name = tool_name # 设置工具名
return func
'''
关键点:
1.给函数添加_is_tool = True标记
2.给函数添加_tool_name属性
3.这样tools_map属性就能识别这个函数是工具了
比喻理解:
这个装饰器就像给工具贴标签:
贴上_is_tool标签:表示这是个工具
贴上_tool_name标签:写清楚工具叫什么名字
贴了标签的工具才能被工具箱识别和收集
'''
if callable(name): #处理装饰器的不同使用方式。
return decorator(name)
return decorator
'''
装饰器的两种调用方式:
无括号:装饰器直接接收函数
有括号:装饰器先接收参数,返回另一个函数,再接收函数
'''


