agentserver\tools
本文最后更新于61 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com

file_edit_toolkit.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件编辑工具包 - 适配agentserver架构
提供文件读写、编辑功能,支持SEARCH/REPLACE格式的diff操作
"""

import re # 正则表达式,用来搜索和替换文字
import shutil # 文件操作工具,用来复制文件
import logging # 记录日志(就像写日记,记录发生了什么)
from datetime import datetime # 获取时间
from pathlib import Path # 处理文件路径
#前5行是导入Python自带的工具

from .base_toolkit import AsyncBaseToolkit, register_tool, ToolkitConfig
#最后一行是导入自己写的其他代码(从当前文件夹的base_toolkit.py导入)

logger = logging.getLogger(__name__)


#创建一个叫FileEditToolkit的类
#它继承自AsyncBaseToolkit(意思是它拥有父类的所有功能)
class FileEditToolkit(AsyncBaseToolkit):
"""文件编辑工具包"""

def __init__(self, config: ToolkitConfig = None) -> None: #它继承自AsyncBaseToolkit(意思是它拥有父类的所有功能)
super().__init__(config)
workspace_root = self.config.config.get("workspace_root", "/tmp/")
self.setup_workspace(workspace_root)

self.default_encoding = self.config.config.get("default_encoding", "utf-8")
self.backup_enabled = self.config.config.get("backup_enabled", False)
logger.info(
f"FileEditToolkit初始化完成 - 工作目录: {self.work_dir}, 编码: {self.default_encoding}"
)
'''
做的事情:
先调用父类的设置方法
设置工作目录(文件存放的地方)
设置文件编码(默认用UTF-8,可以显示中文)
设置是否备份文件
记录一条日志说初始化完成了
'''

def setup_workspace(self, workspace_root: str): #创建一个文件夹作为工作空间
"""设置工作空间"""
self.work_dir = Path(workspace_root).resolve() #把传入的路径变成Path对象
self.work_dir.mkdir(parents=True, exist_ok=True) #创建这个文件夹(如果不存在的话)

def _sanitize_filename(self, filename: str) -> str: #把文件名中的特殊字符(除了字母、数字、下划线、横杠、点)都变成下划线
"""清理文件名,移除不安全字符"""
safe = re.sub(r"[^\w\-.]", "_", filename) #例子:test&file.txt → test_file.txt
return safe

def _resolve_filepath(self, file_path: str) -> Path:
"""解析文件路径,处理相对路径和安全检查"""
path_obj = Path(file_path)
if not path_obj.is_absolute(): #如果是相对路径,就加上工作目录变成绝对路径
path_obj = self.work_dir / path_obj

sanitized_filename = self._sanitize_filename(path_obj.name) #清理文件名
path_obj = path_obj.parent / sanitized_filename
resolved_path = path_obj.resolve()
self._create_backup(resolved_path) #如果需要,创建备份
return resolved_path #返回处理后的路径

def _create_backup(self, file_path: Path) -> None:
"""创建文件备份""" #在修改文件前先备份
if not self.backup_enabled or not file_path.exists(): #只有当backup_enabled为True且文件存在时才备份
return

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = file_path.parent / f"{file_path.name}.{timestamp}.bak" #份文件命名:原文件名 + 时间 + .bak
shutil.copy2(file_path, backup_path)
logger.info(f"创建备份文件: {backup_path}")

@register_tool
async def edit_file(self, path: str, diff: str) -> str:
"""编辑文件,使用SEARCH/REPLACE格式的diff

参数:
path (str): 要编辑的文件路径
diff (str): SEARCH/REPLACE格式的diff内容,格式如下:
```
<<<<<<< SEARCH
[要查找的确切内容]
=======
[要替换的新内容]
>>>>>>> REPLACE
```
"""
try:
resolved_path = self._resolve_filepath(path)

with open(resolved_path, encoding=self.default_encoding) as f:
content = f.read()
modified_content = content
pattern = r"<<<<<<< SEARCH\n(.*?)\n=======\n(.*?)\n>>>>>>> REPLACE"
matches = re.findall(pattern, diff, re.DOTALL)
if not matches:
return "错误!在提供的diff中未找到有效的SEARCH/REPLACE块"

# 应用每个搜索/替换对
for search_text, replace_text in matches:
if search_text in modified_content:
modified_content = modified_content.replace(search_text, replace_text)
else:
logger.warning(f"在文件中未找到搜索文本: {search_text[:50]}...")

with open(resolved_path, "w", encoding=self.default_encoding) as f:
f.write(modified_content)
return f"成功编辑文件: {resolved_path}"
except Exception as e:
logger.error(f"编辑文件失败: {e}")
return f"编辑文件失败: {str(e)}"

@register_tool
async def write_file(self, path: str, file_text: str) -> str:
"""写入文本内容到文件

参数:
path (str): 要写入的文件路径
file_text (str): 要写入的完整文本内容
"""
try:
path_obj = self._resolve_filepath(path)
path_obj.parent.mkdir(parents=True, exist_ok=True)
path_obj.write_text(file_text, encoding=self.default_encoding)
return f"成功写入文件: {path_obj}"
except Exception as e:
logger.error(f"写入文件失败: {e}")
return f"写入文件失败: {str(e)}"

@register_tool
async def read_file(self, path: str) -> str:
"""读取并返回文件内容

参数:
path (str): 要读取的文件路径
"""
try:
path_obj = self._resolve_filepath(path)
if not path_obj.exists():
return f"文件不存在: {path_obj}"
return path_obj.read_text(encoding=self.default_encoding)
except Exception as e:
logger.error(f"读取文件失败: {e}")
return f"读取文件失败: {str(e)}"

@register_tool
async def list_files(self, directory: str = ".") -> str:
"""列出目录中的文件

参数:
directory (str): 要列出的目录路径,默认为当前目录
"""
try:
if directory == ".":
target_dir = self.work_dir
else:
target_dir = self.work_dir / directory

if not target_dir.exists():
return f"目录不存在: {target_dir}"

files = []
for item in target_dir.iterdir():
if item.is_file():
files.append(f"文件: {item.name}")
elif item.is_dir():
files.append(f"目录: {item.name}/")

if not files:
return f"目录为空: {target_dir}"

return f"目录 {target_dir} 内容:\n" + "\n".join(files)
except Exception as e:
logger.error(f"列出文件失败: {e}")
return f"列出文件失败: {str(e)}"

@register_tool
async def create_directory(self, path: str) -> str:
"""创建目录

参数:
path (str): 要创建的目录路径
"""
try:
path_obj = self.work_dir / path
path_obj.mkdir(parents=True, exist_ok=True)
return f"成功创建目录: {path_obj}"
except Exception as e:
logger.error(f"创建目录失败: {e}")
return f"创建目录失败: {str(e)}"

@register_tool
async def delete_file(self, path: str) -> str:
"""删除文件

参数:
path (str): 要删除的文件路径
"""
try:
path_obj = self._resolve_filepath(path)
if not path_obj.exists():
return f"文件不存在: {path_obj}"
path_obj.unlink()
return f"成功删除文件: {path_obj}"
except Exception as e:
logger.error(f"删除文件失败: {e}")
return f"删除文件失败: {str(e)}"

base_toolkit.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基础工具包类 - 适配agentserver架构
"""

import logging
from collections.abc import Callable  #Callable:表示"可以被调用的东西",比如函数
from typing import Any, Dict, Optional
'''
typing模块:用来做类型提示的,告诉Python每个变量应该是什么类型
   Any:可以是任何类型
   Dict:字典类型
   Optional:可选类型(可能有值,可能没有)
'''

logger = logging.getLogger(__name__)


#这是一个用来存储配置信息的类。
class ToolkitConfig:
    """工具包配置类"""
    
    def __init__(self, config: Dict[str, Any] = None, name: str = None):
        self.config = config or {}
        self.name = name or self.__class__.__name__
        self.mode = "builtin"
        self.activated_tools = None
        '''
        这个类有4个属性(可以理解为4个盒子):
          1.self.config - 配置字典
             如果传入了配置,就用传入的
             如果没有传入,就用空字典{}
          2.self.name - 名字
             如果传入了名字,就用传入的
             如果没有,就用类的名字("ToolkitConfig")
          3.self.mode - 模式
             固定为"builtin"(内置模式)
          4.self.activated_tools - 激活的工具
             初始为None(空)
             用来决定哪些工具可以用
        '''
    

class AsyncBaseToolkit:
    """工具包基类 - 适配agentserver架构"""

    def __init__(self, config: ToolkitConfig | dict | None = None):
        # 看看传入的config是不是ToolkitConfig类型,如果不是:就创建一个新的ToolkitConfig对象,如果是:就直接使用
        if not isinstance(config, ToolkitConfig): 
            config = config or {}
            config = ToolkitConfig(config=config, name=self.__class__.__name__)

        self.config: ToolkitConfig = config               #self.config:存储配置对象
        self._tools_map: dict[str, Callable] = None       #self._tools_map:存储工具映射(初始为None)

    @property
    # 这个属性会自动收集所有被@register_tool装饰过的方法
    def tools_map(self) -> dict[str, Callable]:  #返回类型:dict[str, Callable](字典,键是字符串,值是可调用对象)
        #注意:因为有 @property,所以实际上是个"属性"
        """懒加载工具映射 - 收集通过@register_tool注册的工具"""
        if self._tools_map is None #检查 self._tools_map 是否为 None.如果已经是字典,说明已经收集过了,不用再收集
            self._tools_map = {} #如果还没收集过:创建一个空字典,self._tools_map 是私有属性,存储工具映射
            # 遍历类方法,找到带有@register_tool装饰器的方法
            for attr_name in dir(self):  #dir(self):获取对象所有属性和方法的名称列表,循环:遍历每个属性和方法的名称
                #例子:如果对象有 edit_file, read_file 方法,dir(self) 会返回包含这些名字的列表
                attr = getattr(self, attr_name)  #getattr(self, attr_name):根据名称获取实际的属性或方法.如果 attr_name 是 "edit_file",那么 attr 就是 edit_file 方法本身
                #这是关键判断条件,分两部分:
                if callable(attr) and getattr(attr, "_is_tool", False):
                    '''
                    第一部分:callable(attr)
                       检查 attr 是否"可调用"
                       什么是"可调用"?就是可以加括号执行的东西
                             函数 √ 可调用
                             方法 √ 可调用
                             字符串 × 不可调用
                            整数 × 不可调用

                    第二部分:getattr(attr, "_is_tool", False)
                       getattr(attr, "_is_tool", False):尝试获取 attr 的 _is_tool 属性
                       三个参数:
                          attr:要检查的对象
                          "_is_tool":要获取的属性名
                          False:如果属性不存在,返回 False
                      作用:检查这个函数是否被 @register_tool 装饰过

                   组合起来:
                     如果是可调用的(是函数/方法)
                     并且有 _is_tool 属性(值为 True)
                     那么说明这是一个工具方法
                    '''
                    self._tools_map[attr._tool_name] = attr #如果通过了上面的检查:把工具名作为键:attr._tool_name,把工具函数作为值:attr,存入字典 self._tools_map
        return self._tools_map
       '''
        如何工作?
          1.第一次访问tools_map时,self._tools_map是None
          2然后它开始搜索:
             遍历这个类的所有属性和方法
             检查每个属性是否"可调用"(是函数)并且有_is_tool标记
             如果有,就把它添加到工具映射中
        '''
    
    
    #定义一个方法,返回字典(工具名 -> 工具函数)
    def get_tools_map_func(self) -> dict[str, Callable]:
        """获取工具映射,根据配置过滤激活的工具"""
        if self.config.activated_tools: #检查配置中是否有 activated_tools,如果有,说明需要过滤(只返回指定的工具),如果没有,返回所有工具
            #这是最难理解的部分,我们拆开看:all():检查所有条件是否都为 True
            #里面的表达式:tool_name in self.tools_map for tool_name in self.config.activated_tools,意思:对于配置中的每个工具名,检查它是否在 tools_map 中存在
            #返回值:如果所有工具名都存在,返回True;否则返回False
            '''assert 语句:
            assert 条件, "错误信息"
            作用:如果条件为 False,就报错,显示错误信息\这里:如果配置的工具名有不存在于实际工具中的,就报错
            '''
            assert all(tool_name in self.tools_map for tool_name in self.config.activated_tools), (
                f"错误配置激活工具: {self.config.activated_tools}! 可用工具: {self.tools_map.keys()}" #格式化字符串,显示:配置了哪些工具,实际有哪些工具可用
            )
            tools_map = {tool_name: self.tools_map[tool_name] for tool_name in self.config.activated_tools}#快速创建字典的方法
            '''
            拆解:
            # 伪代码理解:
            tools_map = {}
            for tool_name in self.config.activated_tools:
                tools_map[tool_name] = self.tools_map[tool_name]
            '''
            #结果:创建一个新字典,只包含配置中指定的工具
        else: #否则(如果没有配置 activated_tools),返回所有工具
            tools_map = self.tools_map
        return tools_map
    '''
    功能解释:
      获取工具映射
      如果配置了activated_tools(激活的工具列表),就只返回这些工具
      如果没有配置,就返回所有工具
    '''

    def get_tools_list(self) -> list[Dict[str, Any]]: #定义一个方法,返回列表,列表中的每个元素是字典
        """获取工具列表 - 适配agentserver格式"""
        tools_map = self.get_tools_map_func() #调用刚才的方法,获取工具映射(可能已过滤)
        tools = []  #创建一个空列表,准备存储工具信息
        for tool_name, tool_func in tools_map.items(): #遍历工具字典的每一项
            #每次循环得到:tool_name:工具名(字符串),tool_func:工具函数(可调用对象)
            tools.append({   #这是向列表添加一个字典:字典包含三个键:
                "name": tool_name,  
                "function": tool_func,
                "description": tool_func.__doc__ or f"工具: {tool_name}" #tool_func.__doc__ 获取函数的文档字符串
                #or 运算符:如果左边为真(有文档),就用左边的,如果左边为假(空字符串或None),就用右边的
            })
        return tools
    '''
    功能解释:
    把工具转换成列表格式,每个工具包含:
      1.名字(name)
      2.函数(function)
      3.描述(description)
         用函数的文档字符串(__doc__)
         如果没有文档,就显示"工具: 工具名"
    为什么需要这个?
      有些系统需要工具以特定格式提供
      这个格式让其他系统知道每个工具是干什么的
    '''

    async def call_tool(self, name: str, arguments: dict) -> str:
        """通过名称调用工具"""
        tools_map = self.get_tools_map_func() #获取工具映射
        if name not in tools_map:
            raise ValueError(f"工具 {name} 未找到") #如果不存在,抛出错误
        tool = tools_map[name]
        #这是关键调用部分:
        return await tool(**arguments) #**arguments:** 是"解包"运算符,把字典解包成关键字参数
    '''
    工作步骤:
     获取工具映射
     检查要调用的工具是否存在
     如果不存在,报错
     如果存在,调用这个工具
     把参数传给工具
     返回结果
    '''

#这是一个装饰器,用来标记哪些方法是工具
def register_tool(name: str = None):
'''
 使用方法:
   1.最简单的方式:@register_tool
       使用函数本身的名字作为工具名
   2.使用括号:@register_tool()
       同上,使用函数名
   3.自定义名字:@register_tool("custom_name")
       使用自定义名字作为工具名
'''
    """装饰器:将方法注册为工具
    
    用法:
        @register_tool  # 使用方法名
        @register_tool()  # 使用方法名  
        @register_tool("custom_name")  # 使用自定义名称
        
    参数:
        name (str, optional): 工具名称
    """

    def decorator(func: Callable):
        if isinstance(name, str): 
            tool_name = name
        else:
            tool_name = func.__name__
        func._is_tool = True     # 标记这个函数是工具
        func._tool_name = tool_name     # 设置工具名
        return func
    '''
    关键点:
     1.给函数添加_is_tool = True标记
     2.给函数添加_tool_name属性
    3.这样tools_map属性就能识别这个函数是工具了
   比喻理解:
   这个装饰器就像给工具贴标签:
     贴上_is_tool标签:表示这是个工具
     贴上_tool_name标签:写清楚工具叫什么名字
     贴了标签的工具才能被工具箱识别和收集
    '''

    if callable(name): #处理装饰器的不同使用方式。
        return decorator(name)
    return decorator
    '''
    装饰器的两种调用方式:
      无括号:装饰器直接接收函数
      有括号:装饰器先接收参数,返回另一个函数,再接收函数
      '''
文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇