本文最后更新于62 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com
computer_control_agent
"""
电脑控制主Agent - 协调各个组件,提供统一的电脑控制接口
"""
import asyncio
import logging
from typing import Dict, Any, Optional
import json #用于把数据转换成JSON格式(网络传输常用)
from .computer_use_adapter import ComputerUseAdapter
from .visual_analyzer import VisualAnalyzer
from .action_executor import ActionExecutor, ActionResult
'''
computer_use_adapter.py(电脑控制适配器)
action_executor.py(动作执行器)
visual_analyzer.py(视觉分析器 - 我们还没看)
'''
# 配置日志
logger = logging.getLogger(__name__)
class ComputerControlAgent:
"""电脑控制主Agent,负责协调各个组件"""
def __init__(self):
"""初始化电脑控制Agent"""
self.adapter = ComputerUseAdapter() # 控制专家,adapter:负责具体的鼠标键盘控制
self.analyzer = VisualAnalyzer() # 视觉专家,analyzer:负责分析屏幕内容
self.executor = ActionExecutor( # 执行专家,executor:负责协调执行动作
computer_adapter=self.adapter,
visual_analyzer=self.analyzer
)
'''
执行器 (executor)
↓
├─→ 适配器 (adapter):控制硬件
└─→ 分析器 (analyzer):分析屏幕
'''
logger.info("电脑控制Agent初始化完成")
async def handle_handoff(self, task: dict) -> str:
"""处理电脑控制任务"""
try:
logger.info(f"收到电脑控制任务: {task}")
# 解析任务参数
action = task.get("action", "")
target = task.get("target", "")
parameters = task.get("parameters", {})
# 根据动作类型处理
if action == "click":
return await self._handle_click(target, parameters)
elif action == "click_ai":
return await self._handle_click_ai(target, parameters)
elif action == "type":
return await self._handle_type(target, parameters)
elif action == "screenshot":
return await self._handle_screenshot(target, parameters)
elif action == "find_element":
return await self._handle_find_element(target, parameters)
elif action == "locate_ai":
return await self._handle_locate_ai(target, parameters)
elif action == "automate_task":
return await self._handle_automate_task(target, parameters)
elif action == "coordinate_info":
return await self._handle_coordinate_info(target, parameters)
else:
return await self._handle_generic_task(action, target, parameters)
'''
这是整个系统的"大门":
1.接收一个任务(字典格式)
2.解析出三个部分:
action:要做什么(点击、打字等)
target:目标是什么
parameters:额外参数
3.根据动作类型,分发给专门的处理函数
'''
except Exception as e:
logger.error(f"处理电脑控制任务失败: {e}")
return json.dumps({
"success": False,
"error": str(e),
"message": f"任务处理失败: {str(e)}"
}, ensure_ascii=False)
# 点击处理函数(两种点击方式)
async def _handle_click(self, target: str, parameters: Dict[str, Any]) -> str:
"""处理点击任务,支持AI定位和多种坐标格式"""
try:
# 检查是否使用AI定位
use_ai_location = parameters.get("use_ai", False)
ai_description = parameters.get("ai_description", target)
#特点:告诉AI"点击保存按钮",AI自己找按钮位置
if use_ai_location and ai_description:
# 使用AI定位进行点击
success = await self.adapter.click_with_ai_location(ai_description, parameters.get("button", "left"))
if success:
return json.dumps({
"success": True,
"message": f"AI定位点击成功: {ai_description}",
"data": {"method": "ai_location", "target": ai_description}
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"error": "AI定位失败",
"message": f"AI定位点击失败: {ai_description}"
}, ensure_ascii=False)
else:
# 使用传统方式执行点击,特点:指定具体坐标位置进行点击
action = {
"action": "click",
"target": target,
"parameters": parameters
}
# 执行点击
result = await self.executor.execute_action(action)
if result.success:
return json.dumps({
"success": True,
"message": f"成功点击: {target}",
"data": result.data
}, ensure_ascii=False)
else:
return json.dumps({
"success": False, # 是否成功
"error": result.error, # 结果信息
"message": result.message # 额外数据
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"message": f"点击操作失败: {str(e)}"
}, ensure_ascii=False)
async def _handle_type(self, target: str, parameters: Dict[str, Any]) -> str:
"""处理输入任务"""
try:
# 构建输入动作
action = {
"action": "type",
"target": target,
"parameters": parameters
}
# 执行输入
result = await self.executor.execute_action(action)
if result.success:
return json.dumps({
"success": True,
"message": f"成功输入: {target}",
"data": result.data
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"error": result.error,
"message": result.message
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"message": f"输入操作失败: {str(e)}"
}, ensure_ascii=False)
async def _handle_screenshot(self, target: str, parameters: Dict[str, Any]) -> str:
"""处理截图任务"""
try:
# 构建截图动作
action = {
"action": "screenshot",
"target": target,
"parameters": parameters
}
# 执行截图
result = await self.executor.execute_action(action)
if result.success:
return json.dumps({
"success": True,
"message": f"截图成功: {target}",
"data": result.data
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"error": result.error,
"message": result.message
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"message": f"截图操作失败: {str(e)}"
}, ensure_ascii=False)
async def _handle_find_element(self, target: str, parameters: Dict[str, Any]) -> str:
"""处理元素查找任务"""
try:
# 构建查找动作
action = {
"action": "find_element",
"target": target,
"parameters": parameters
}
# 执行查找
result = await self.executor.execute_action(action)
if result.success:
return json.dumps({
"success": True,
"message": f"找到元素: {target}",
"data": result.data
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"error": result.error,
"message": result.message
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"message": f"元素查找失败: {str(e)}"
}, ensure_ascii=False)
async def _handle_automate_task(self, target: str, parameters: Dict[str, Any]) -> str:
"""处理自动化任务"""
try:
logger.info(f"开始自动化任务: {target}")
# 直接调用电脑控制适配器执行自然语言指令
exec_result = await self.adapter.run_instruction(target)
success = bool(exec_result.get("success")) if isinstance(exec_result, dict) else False
return json.dumps({
"success": success,
"message": "任务执行完成" if success else exec_result.get("error", "任务执行失败"),
"data": exec_result
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"message": f"自动化任务失败: {str(e)}"
}, ensure_ascii=False)
async def _handle_generic_task(self, action: str, target: str, parameters: Dict[str, Any]) -> str:
"""处理通用任务"""
try:
# 构建通用动作
action_dict = {
"action": action,
"target": target,
"parameters": parameters
}
# 执行动作
result = await self.executor.execute_action(action_dict)
if result.success:
return json.dumps({
"success": True,
"message": f"任务执行成功: {action} - {target}",
"data": result.data
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"error": result.error,
"message": result.message
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"message": f"通用任务执行失败: {str(e)}"
}, ensure_ascii=False)
def get_capabilities(self) -> Dict[str, Any]:
"""获取电脑控制能力"""
return {
"enabled": True,
"adapter": self.adapter.is_available(),
"analyzer": self.analyzer.is_available(),
"planner": self.planner.is_available(),
"executor": self.executor.is_available(),
"capabilities": [
"鼠标点击",
"键盘输入",
"屏幕截图",
"元素查找",
"任务自动化",
"视觉分析"
]
}
async def _handle_click_ai(self, target: str, parameters: Dict[str, Any]) -> str:
"""处理AI定位点击任务"""
try:
# 使用AI定位进行点击
success = await self.adapter.click_with_ai_location(target, parameters.get("button", "left"))
if success:
return json.dumps({
"success": True,
"message": f"AI定位点击成功: {target}",
"data": {"method": "ai_location", "target": target}
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"error": "AI定位失败",
"message": f"AI定位点击失败: {target}"
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"message": f"AI定位点击失败: {str(e)}"
}, ensure_ascii=False)
async def _handle_locate_ai(self, target: str, parameters: Dict[str, Any]) -> str:
"""处理AI定位任务"""
try:
# 获取屏幕截图
screenshot = await self.adapter.take_screenshot()
if not screenshot:
return json.dumps({
"success": False,
"error": "截图失败",
"message": "无法获取屏幕截图"
}, ensure_ascii=False)
# 使用AI定位元素
location = await self.analyzer.locate_element_with_ai(
target,
screenshot,
self.adapter.screen_width,
self.adapter.screen_height
)
if location:
x, y = location
return json.dumps({
"success": True,
"message": f"AI定位成功: {target}",
"data": {
"target": target,
"coordinates": {"x": x, "y": y},
"method": "ai_location"
}
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"error": "AI定位失败",
"message": f"无法定位元素: {target}"
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"message": f"AI定位失败: {str(e)}"
}, ensure_ascii=False)
async def _handle_coordinate_info(self, target: str, parameters: Dict[str, Any]) -> str:
"""处理坐标系统信息查询"""
try:
# 获取坐标系统信息
coordinate_info = self.adapter.get_coordinate_info()
return json.dumps({
"success": True,
"message": "坐标系统信息获取成功",
"data": coordinate_info
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"message": f"获取坐标系统信息失败: {str(e)}"
}, ensure_ascii=False)
def get_status(self) -> Dict[str, Any]:
"""获取电脑控制状态"""
return {
"agent_name": "ComputerControlAgent",
"version": "2.0.0", # 升级版本号
"status": "running",
"capabilities": self.get_capabilities(),
"components": {
"adapter": "ComputerUseAdapter",
"analyzer": "VisualAnalyzer",
"planner": "TaskPlanner",
"executor": "ActionExecutor"
},
"upgrades": [
"AI坐标定位",
"坐标标准化系统",
"多格式坐标支持",
"智能元素定位"
]
}
# ==========这个文件创造了什么?==========
'''
新的类:ComputerControlAgent(总指挥)
9个处理函数:
handle_handoff(主入口)
_handle_click(点击)
_handle_click_ai(AI点击)
_handle_type(打字)
_handle_screenshot(截图)
_handle_find_element(查找元素)
_handle_automate_task(自动化任务)
_handle_locate_ai(AI定位)
_handle_coordinate_info(坐标信息)
_handle_generic_task(通用任务)
2个状态函数:
get_capabilities(能力查询)
get_status(状态查询)
它调用了哪些外部函数?
adapter.click_with_ai_location():AI点击
adapter.take_screenshot():截图
adapter.run_instruction():执行自然语言指令
adapter.get_coordinate_info():获取坐标信息
analyzer.locate_element_with_ai():AI定位元素
executor.execute_action():执行动作
它做了哪些配置?
日志配置:记录运行过程
团队配置:组建了三个专家
返回格式配置:统一使用JSON格式
'''
#这个文件是整个系统的总入口
action_executor.py
"""
动作执行器 - 执行具体的鼠标键盘操作
提供安全、可靠的动作执行功能
"""
import asyncio # 异步工具:让多个任务可以同时进行
import logging # 日志工具:记录程序运行过程
from typing import Dict, Any, Optional, Tuple, List # 类型提示:让代码更清晰
from dataclasses import dataclass # 数据类工具:简化类的创建
from enum import Enum # 枚举工具:定义固定选项
# 配置日志
logger = logging.getLogger(__name__)
class ActionType(Enum):
"""动作类型枚举"""
CLICK = "click" # 点击
TYPE = "type" # 打字
SCREENSHOT = "screenshot" # 截图
SCROLL = "scroll" # 滚动
DRAG = "drag" # 拖拽
WAIT = "wait" # 等待
FIND_ELEMENT = "find_element" # 查找元素
ANALYZE = "analyze" # 分析
@dataclass
class ActionResult:
"""数据类-动作执行结果"""
success: bool # 是否成功
message: str # 结果信息
data: Optional[Dict[str, Any]] = None # 额外数据
error: Optional[str] = None # 错误信息
class ActionExecutor:
"""动作执行器,执行具体的鼠标键盘操作"""
def __init__(self, computer_adapter=None, visual_analyzer=None):
"""初始化动作执行器"""
self.computer_adapter = computer_adapter # 控制电脑的接口
self.visual_analyzer = visual_analyzer # 分析屏幕的工具
self.execution_history = [] # 执行历史记录
self.safety_mode = True # 安全模式开关
self.max_retry_count = 3 # 最多重试3次
async def execute_action(self, action: Dict[str, Any]) -> ActionResult:
"""执行具体动作"""
#这个函数是“总指挥”,它:接收一个动作指令(字典),检查是什么类型的动作,检查是否安全,根据动作类型调用不同的执行函数,记录执行结果
try:
action_type = action.get("action", "").lower()
target = action.get("target", "")
parameters = action.get("parameters", {})
logger.info(f"执行动作: {action_type}, 目标: {target}")
# 安全检查
if self.safety_mode and not self._is_safe_action(action_type, target): # 如果开了安全模式且动作不安全,就停止执行
return ActionResult(
success=False,
message="动作被安全模式阻止",
error="不安全动作"
)
# 根据动作类型执行
if action_type == ActionType.CLICK.value:
return await self._execute_click(target, parameters)
elif action_type == ActionType.TYPE.value:
return await self._execute_type(target, parameters)
elif action_type == ActionType.SCREENSHOT.value:
return await self._execute_screenshot(target, parameters)
elif action_type == ActionType.SCROLL.value:
return await self._execute_scroll(target, parameters)
elif action_type == ActionType.DRAG.value:
return await self._execute_drag(target, parameters)
elif action_type == ActionType.WAIT.value:
return await self._execute_wait(target, parameters)
elif action_type == ActionType.FIND_ELEMENT.value:
return await self._execute_find_element(target, parameters)
elif action_type == ActionType.ANALYZE.value:
return await self._execute_analyze(target, parameters)
else:
return ActionResult(
success=False,
message=f"未知动作类型: {action_type}",
error="未知动作类型"
)
except Exception as e:
logger.error(f"动作执行失败: {e}")
return ActionResult(
success=False,
message=f"动作执行异常: {str(e)}",
error=str(e)
)
finally:
# 记录执行历史
self.execution_history.append({
"action": action,
"timestamp": asyncio.get_event_loop().time()
})
async def _execute_click(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
"""执行点击动作,支持多种坐标格式和AI定位"""
try: #检查是否有电脑控制工具
if not self.computer_adapter:
return ActionResult(
success=False,
message="电脑控制适配器未初始化",
error="适配器未初始化"
)
# 获取坐标,支持多种格式
x = parameters.get("x")
y = parameters.get("y")
button = parameters.get("button", "left")
# 处理坐标格式
if x is not None and y is not None:
# 直接坐标
x, y = self._parse_coordinates(x, y)
elif isinstance(target, str) and (target.startswith(('点击', 'click', 'Click')) or
any(keyword in target.lower() for keyword in ['按钮', 'button', '图标', 'icon'])):
# 尝试AI定位,如果没有坐标,就用AI来寻找
if self.visual_analyzer:
screenshot = await self.computer_adapter.take_screenshot()
location = await self.visual_analyzer.locate_element(target, screenshot)
if location:
x, y = location
else:
return ActionResult(
success=False,
message=f"AI定位失败: {target}",
error="AI定位失败"
)
else:
return ActionResult(
success=False,
message="需要坐标或视觉分析器",
error="缺少必要参数"
)
else:
return ActionResult(
success=False,
message="缺少坐标参数",
error="缺少坐标参数"
)
# 执行点击
success = await self.computer_adapter.click(x, y, button)
if success:
return ActionResult(
success=True,
message=f"成功点击: ({x}, {y})",
data={"x": x, "y": y, "button": button, "target": target}
)
else:
return ActionResult(
success=False,
message="点击操作失败",
error="点击失败"
)
except Exception as e:
return ActionResult(
success=False,
message=f"点击执行失败: {str(e)}",
error=str(e)
)
def _parse_coordinates(self, x, y) -> Tuple[int, int]:
"""解析坐标,支持多种格式"""
try:
# 处理字符串坐标
if isinstance(x, str):
x = float(x)
if isinstance(y, str):
y = float(y)
# 处理元组/列表坐标
if isinstance(x, (tuple, list)) and len(x) == 2:
x, y = x[0], x[1]
# 转换为整数
x = int(round(float(x)))
y = int(round(float(y)))
return x, y
except Exception as e:
logger.error(f"坐标解析失败: {e}")
return 0, 0
async def _execute_type(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
"""执行输入动作"""
try:
if not self.computer_adapter:
return ActionResult(
success=False,
message="电脑控制适配器未初始化",
error="适配器未初始化"
)
text = parameters.get("text", target)
interval = parameters.get("interval", 0.1)
# 执行输入
success = await self.computer_adapter.type_text(text, interval)
if success:
return ActionResult(
success=True,
message=f"成功输入文本: {text}",
data={"text": text, "interval": interval}
)
else:
return ActionResult(
success=False,
message="文本输入失败",
error="输入失败"
)
except Exception as e:
return ActionResult(
success=False,
message=f"输入执行失败: {str(e)}",
error=str(e)
)
async def _execute_screenshot(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
"""执行截图动作"""
try:
if not self.computer_adapter:
return ActionResult(
success=False,
message="电脑控制适配器未初始化",
error="适配器未初始化"
)
# 执行截图
screenshot = await self.computer_adapter.take_screenshot()
if screenshot:
return ActionResult(
success=True,
message="截图成功",
data={"screenshot_size": len(screenshot)}
)
else:
return ActionResult(
success=False,
message="截图失败",
error="截图失败"
)
except Exception as e:
return ActionResult(
success=False,
message=f"截图执行失败: {str(e)}",
error=str(e)
)
async def _execute_scroll(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
"""执行滚动动作"""
try:
if not self.computer_adapter:
return ActionResult(
success=False,
message="电脑控制适配器未初始化",
error="适配器未初始化"
)
x = parameters.get("x", 0)
y = parameters.get("y", 0)
clicks = parameters.get("clicks", 3)
# 执行滚动
success = await self.computer_adapter.scroll(x, y, clicks)
if success:
return ActionResult(
success=True,
message=f"成功滚动: ({x}, {y}), 滚动量: {clicks}",
data={"x": x, "y": y, "clicks": clicks}
)
else:
return ActionResult(
success=False,
message="滚动操作失败",
error="滚动失败"
)
except Exception as e:
return ActionResult(
success=False,
message=f"滚动执行失败: {str(e)}",
error=str(e)
)
async def _execute_drag(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
"""执行拖拽动作"""
try:
if not self.computer_adapter:
return ActionResult(
success=False,
message="电脑控制适配器未初始化",
error="适配器未初始化"
)
start_x = parameters.get("start_x", 0)
start_y = parameters.get("start_y", 0)
end_x = parameters.get("end_x", 100)
end_y = parameters.get("end_y", 100)
duration = parameters.get("duration", 1.0)
# 执行拖拽
success = await self.computer_adapter.drag_to(start_x, start_y, end_x, end_y, duration)
if success:
return ActionResult(
success=True,
message=f"成功拖拽: ({start_x}, {start_y}) -> ({end_x}, {end_y})",
data={"start": (start_x, start_y), "end": (end_x, end_y), "duration": duration}
)
else:
return ActionResult(
success=False,
message="拖拽操作失败",
error="拖拽失败"
)
except Exception as e:
return ActionResult(
success=False,
message=f"拖拽执行失败: {str(e)}",
error=str(e)
)
async def _execute_wait(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
"""执行等待动作"""
try:
duration = parameters.get("duration", 1.0)
# 执行等待
await asyncio.sleep(duration)
return ActionResult(
success=True,
message=f"等待完成: {duration}秒",
data={"duration": duration}
)
except Exception as e:
return ActionResult(
success=False,
message=f"等待执行失败: {str(e)}",
error=str(e)
)
async def _execute_find_element(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
"""执行元素查找动作"""
try:
if not self.visual_analyzer:
return ActionResult(
success=False,
message="视觉分析器未初始化",
error="分析器未初始化"
)
if not self.computer_adapter:
return ActionResult(
success=False,
message="电脑控制适配器未初始化",
error="适配器未初始化"
)
# 先截图
screenshot = await self.computer_adapter.take_screenshot()
if not screenshot:
return ActionResult(
success=False,
message="截图失败",
error="截图失败"
)
# 查找元素
location = await self.visual_analyzer.locate_element(target, screenshot)
if location:
return ActionResult(
success=True,
message=f"找到元素: {target} at {location}",
data={"location": location, "target": target}
)
else:
return ActionResult(
success=False,
message=f"未找到元素: {target}",
error="元素未找到"
)
except Exception as e:
return ActionResult(
success=False,
message=f"元素查找失败: {str(e)}",
error=str(e)
)
#辅助函数
async def _execute_analyze(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
"""坐标解析函数——执行分析动作"""
try:
if not self.visual_analyzer:
return ActionResult(
success=False,
message="视觉分析器未初始化",
error="分析器未初始化"
)
if not self.computer_adapter:
return ActionResult(
success=False,
message="电脑控制适配器未初始化",
error="适配器未初始化"
)
# 先截图
screenshot = await self.computer_adapter.take_screenshot()
if not screenshot:
return ActionResult(
success=False,
message="截图失败",
error="截图失败"
)
# 分析屏幕
analysis = await self.visual_analyzer.analyze_screen(screenshot)
if analysis.get("success"):
return ActionResult(
success=True,
message="屏幕分析完成",
data=analysis
)
else:
return ActionResult(
success=False,
message="屏幕分析失败",
error=analysis.get("error", "分析失败")
)
except Exception as e:
return ActionResult(
success=False,
message=f"屏幕分析失败: {str(e)}",
error=str(e)
)
def _is_safe_action(self, action_type: str, target: str) -> bool:
"""检查动作是否安全"""
# 危险动作列表
dangerous_actions = ["delete", "remove", "uninstall", "format", "shutdown", "restart"]
dangerous_targets = ["系统", "system", "注册表", "registry", "管理员", "admin"]
# 检查动作类型
if any(danger in action_type.lower() for danger in dangerous_actions):
return False
# 检查目标
if any(danger in target.lower() for danger in dangerous_targets):
return False
return True
async def execute_step_sequence(self, steps: list) -> List[ActionResult]:
"""执行步骤序列"""
results = []
for i, step in enumerate(steps): #遍历每个步骤
logger.info(f"执行步骤 {i+1}/{len(steps)}: {step.get('action', 'unknown')}")
# 检查依赖,检查这个步骤有没有依赖前面的步骤
if not self._check_dependencies(step, results):
results.append(ActionResult(
success=False,
message=f"步骤 {i+1} 依赖检查失败",
error="依赖检查失败"
))
continue
# 执行步骤
result = await self.execute_action(step)
results.append(result)
# 如果步骤失败,可以选择继续或停止
if not result.success:
logger.warning(f"步骤 {i+1} 执行失败: {result.message}")
# 这里可以选择继续执行或停止
# break # 停止执行
continue # 继续执行
return results
def _check_dependencies(self, step: dict, previous_results: List[ActionResult]) -> bool:
"""检查步骤依赖"""
dependencies = step.get("dependencies", [])
for dep in dependencies:
# 查找依赖的步骤结果
dep_result = None
for result in previous_results:
if hasattr(result, 'step_id') and result.step_id == dep:
dep_result = result
break
if not dep_result or not dep_result.success:
return False
return True
def is_available(self) -> Dict[str, Any]:
"""检查动作执行功能是否可用"""
return {
"enabled": True,
"ready": self.computer_adapter is not None,
"safety_mode": self.safety_mode,
"max_retry_count": self.max_retry_count,
"execution_history_count": len(self.execution_history)
}
# ========这个->是什么东西?=========
#箭头 -> 在Python中叫做类型提示(Type Hints),也叫类型注解。
#def is_available(self) -> Dict[str, Any]:
#这个箭头 -> 的意思是:"这个函数会返回一个……",把它读成:"这个函数会返回一个 Dict[str, Any] 类型的东西"
#详细分解:
'''
1. 箭头左边:函数定义
python
def is_available(self)
这部分我们很熟悉,就是定义一个函数,函数名叫 is_available
2. 箭头 ->:表示"返回什么类型"
可以把它想象成一个方向指示箭头,指向函数返回的东西
3. 箭头右边:返回值的类型描述
Dict[str, Any]
Dict:字典类型(花括号 {} 那种)
[str, Any]:方括号里的内容是说明这个字典的键值对类型
str:键(key)是字符串类型
Any:值(value)可以是任何类型
类比理解:
想象一下,你要去超市买东西:
def 买东西(钱: int) -> 购物袋:
# 用钱买东西
return 装满东西的购物袋
钱: int:告诉你需要带整数(比如100元)
-> 购物袋:告诉你函数会返回一个购物袋
'''
#这个代码中的其他例子:
'''
例1:有参数的类型提示
async def execute_action(self, action: Dict[str, Any]) -> ActionResult:
输入:action: Dict[str, Any]
参数名叫 action
类型是字典,键是字符串,值可以是任何类型
输出:-> ActionResult
会返回一个 ActionResult 类型的对象
例2:多个参数的类型提示
def _parse_coordinates(self, x, y) -> Tuple[int, int]:
输出:-> Tuple[int, int]
会返回一个元组,包含两个整数
例3:可选的类型
data: Optional[Dict[str, Any]] = None
Optional[...] 表示"可能是这个类型,也可能是None"
所以这里的意思是:data 可能是字典,也可能是 None
'''
#为什么要有这个?
'''
好处1:像说明书一样
# 没有类型提示,你不知道该传什么
def 计算(参数):
pass
# 有类型提示,很清楚该传什么
def 计算(参数: int) -> float:
"""
参数应该传整数
会返回小数
"""
pass
好处2:IDE(编程工具)能帮你
编辑器会提示你该传什么类型的参数
如果你传错了类型,编辑器会警告你
代码补全会更准确
好处3:让别人更容易理解你的代码
就像你写的"使用说明书"
'''
#实际运行时会发生什么
#重要:这些类型提示只是提示!Python在运行时会忽略它们!
'''
看这个例子:
def 加法(a: int, b: int) -> int:
return a + b
# 下面这些都能运行,不会报错!
print(加法(1, 2)) # ✅ 正确:3
print(加法("hello", " world")) # ✅ 也能运行:"hello world"
print(加法([1], [2])) # ✅ 也能运行:[1, 2]
类型提示只是给程序员和编程工具看的,Python解释器不检查它们。
'''
#这个文件中出现的所有类型提示:
'''
# 函数参数的类型提示
def 函数名(参数: 类型, 参数2: 类型)
# 函数返回值的类型提示
def 函数名(...) -> 返回类型:
# 变量类型提示
变量名: 类型 = 初始值
'''
#常见类型:
'''
Dict[str, Any] # 字典,键是字符串,值是任何类型
Optional[...] # 可能是...,也可能是None
Tuple[int, int] # 包含两个整数的元组
List[ActionResult] # ActionResult类型的列表
bool # 布尔值(True/False)
str # 字符串
int # 整数
float # 小数
'''
#来,复习一下
'''
def 获取学生信息(学号: str) -> Optional[Dict[str, Any]]:
"""
输入学号(字符串)
返回学生信息(字典,可能是None)
"""
答案:
函数名:获取学生信息
需要传一个参数:学号,类型是字符串
返回值:可能是一个字典(键是字符串,值是任何类型),也可能是 None
'''
#总结:
#1.-> 是类型提示箭头,表示函数返回什么类型
#2.它只是提示,Python运行时不检查
#3.它让代码更清晰,更容易理解
#4.现代Python编程中推荐使用
computer_use_adapter.py
"""
电脑控制适配器 - 基于博弈论的ComputerUseAdapter实现
提供鼠标键盘控制、屏幕截图、视觉分析等核心功能
"""
import io # 输入输出工具,处理数据流
import time # 时间工具,用于等待和计时
import platform # 系统平台工具,识别是Windows/Mac/Linux
import logging # 日志工具,记录运行过程
from typing import Dict, Any, Optional, Tuple # 类型提示
import sys # 系统工具,管理Python运行环境
#特殊处理 - PIL库
if 'nagaagent_core.vendors.pil' in sys.modules:
del sys.modules['nagaagent_core.vendors.pil']
from PIL import Image
'''
这是什么意思?
检查是否导入过某个特定的PIL库
如果导入过,就删除它(避免冲突)
然后重新导入标准的PIL库(Python的图像处理库)
这就像:先检查书包里有没有旧版本的课本,如果有就先拿出来,然后放新版本的进去。
'''
import asyncio # 异步编程工具,让程序能同时做多件事
# 尝试导入依赖包(可能没有的工具)
try: #尝试导入pyautogui(控制鼠标键盘的核心工具)
import nagaagent_core.vendors.pyautogui as pyautogui
PYAUTOGUI_AVAILABLE = True
except ImportError: # 就标记为不可用
PYAUTOGUI_AVAILABLE = False
pyautogui = None
#如果pyautogui安装了,就能控制鼠标键盘,如果没安装,程序不会崩溃,只是相关功能不能用
try: #尝试导入GUI Agents(AI视觉分析工具)
from gui_agents.s2_5.agents.grounding import OSWorldACI #OSWorldACI:操作系统世界接口,可能用于理解屏幕内容
from gui_agents.s2_5.agents.agent_s import AgentS2_5 #AgentS2_5:一个AI代理,可能用于识别屏幕上的元素
GUI_AGENTS_AVAILABLE = True
except ImportError: #重要变量(标记工具是否可用)
GUI_AGENTS_AVAILABLE = False
OSWorldACI = None
AgentS2_5 = None
'''
这些标记告诉程序:
哪些功能可以使用
哪些功能因为缺少工具而不能用
'''
# 配置日志
logger = logging.getLogger(__name__)
class ComputerUseAdapter:
"""电脑控制适配器,基于博弈论的实现"""
#类的初始化 __init__
def __init__(self):
"""初始化电脑控制适配器"""
self.last_error: Optional[str] = None # 记录最后错误
self.agent = None # AI代理
self.grounding_agent = None # 基础代理
self.init_ok = False # 是否初始化成功
# 设置DPI感知(Windows)
self._setup_dpi_awareness()
# 初始化屏幕尺寸 - 基于博弈论的实现
self.screen_width = 1920 # 默认值
self.screen_height = 1080 # 默认值
self.scaled_width = 1920
self.scaled_height = 1080
self.scale_x = 1.0
self.scale_y = 1.0
#物理尺寸:电脑屏幕的实际分辨率,逻辑尺寸:统一缩放到1920x1080的标准尺寸
#为什么?不同电脑的屏幕大小不同,统一标准让AI更容易识别,就像把所有照片都缩放到统一大小
if PYAUTOGUI_AVAILABLE: #用pyautogui获取实际屏幕大小
try:
# 动态获取实际屏幕尺寸
self.screen_width, self.screen_height = pyautogui.size()
logger.info(f"实际屏幕尺寸: {self.screen_width}x{self.screen_height}")
# 计算缩放尺寸(参考博弈论的scale_screen_dimensions函数)
self.scaled_width, self.scaled_height = self._scale_screen_dimensions( #调用_scale_screen_dimensions计算标准尺寸
self.screen_width, self.screen_height, max_dim_size=1920
)
# 计算缩放因子(逻辑坐标 -> 物理坐标),计算缩放比例(实际→标准)
self.scale_x = self.screen_width / max(1, self.scaled_width)
self.scale_y = self.screen_height / max(1, self.scaled_height)
except Exception as e:
logger.warning(f"获取屏幕尺寸失败: {e}")
# 使用默认值
# 初始化组件
self._init_components()
def _setup_dpi_awareness(self): #有些电脑屏幕显示有缩放(比如150%显示)告诉Windows系统,这个程序需要精确的坐标,调用Windows的API函数
"""设置DPI感知,提高坐标精度"""
try:
if platform.system().lower() == "windows": # 只在Windows系统上做
import ctypes
try:
ctypes.windll.shcore.SetProcessDpiAwareness(2)
except Exception:
try:
ctypes.windll.user32.SetProcessDPIAware()
except Exception:
pass
except Exception:
pass
#屏幕缩放函数(尺子转换公式)
def _scale_screen_dimensions(self, width: int, height: int, max_dim_size: int = 1920) -> Tuple[int, int]:
"""缩放屏幕尺寸,基于博弈论的实现"""
scale_factor = min(max_dim_size / width, max_dim_size / height)
safe_width = int(width * scale_factor)
safe_height = int(height * scale_factor)
logger.info(f"屏幕缩放: {width}x{height} -> {safe_width}x{safe_height}, 缩放因子: {scale_factor:.2f}")
return safe_width, safe_height
def _init_components(self):
"""初始化核心组件(启动各个部件)"""
try:
if not PYAUTOGUI_AVAILABLE: # 如果没有pyautogui,就报错返回
self.last_error = "pyautogui未安装"
logger.error("pyautogui未安装,电脑控制功能不可用")
return
if not GUI_AGENTS_AVAILABLE: # 如果没有GUI代理,就报错返回
self.last_error = "gui-agents未安装"
logger.error("gui-agents未安装,高级功能不可用")
return
# 初始化GUI代理
self._init_gui_agents()
self.init_ok = True
logger.info("电脑控制适配器初始化成功")
except Exception as e:
self.last_error = str(e)
logger.error(f"电脑控制适配器初始化失败: {e}")
def _init_gui_agents(self):
"""初始化GUI代理"""
try:
# 这里可以添加GUI代理的初始化逻辑
# 暂时使用基础功能
logger.info("GUI代理初始化完成")
except Exception as e:
logger.warning(f"GUI代理初始化失败: {e}")
def is_available(self) -> Dict[str, Any]:
"""检查电脑控制功能是否可用"""
ok = True
reasons = []
if not PYAUTOGUI_AVAILABLE:
ok = False
reasons.append("pyautogui未安装")
if not self.init_ok:
ok = False
msg = "电脑控制适配器未初始化"
if self.last_error:
msg += f": {self.last_error}"
reasons.append(msg)
return {
"enabled": True,
"ready": ok,
"reasons": reasons,
"screen_info": {
"physical_size": f"{self.screen_width}x{self.screen_height}",
"scaled_size": f"{self.scaled_width}x{self.scaled_height}",
"scale_factors": f"x={self.scale_x:.2f}, y={self.scale_y:.2f}"
},
"platform": platform.system()
}
async def take_screenshot(self) -> Optional[bytes]:
"""截取屏幕截图"""
if not PYAUTOGUI_AVAILABLE:
return None
try:
screenshot = pyautogui.screenshot()
# 基于博弈论的实现:将截图缩放到逻辑尺寸
screenshot = screenshot.resize((self.scaled_width, self.scaled_height), Image.LANCZOS)
buf = io.BytesIO()
screenshot.save(buf, format="PNG")
return buf.getvalue()
except Exception as e:
logger.error(f"截取屏幕截图失败: {e}")
return None
def _scale_coordinates(self, x: int, y: int) -> Tuple[int, int]:
"""缩放坐标,将逻辑坐标转换为物理坐标"""
scaled_x = int(round(x * self.scale_x))
scaled_y = int(round(y * self.scale_y))
return scaled_x, scaled_y
def _create_scaled_pyautogui(self):
"""创建缩放版本的pyautogui代理"""
if not PYAUTOGUI_AVAILABLE:
return None
class _ScaledPyAutoGUI:
"""
轻量级代理,将逻辑坐标空间缩放到物理屏幕坐标
支持多种坐标格式
"""
def __init__(self, backend, scale_x: float, scale_y: float):
self._backend = backend
self._scale_x = scale_x
self._scale_y = scale_y
def __getattr__(self, name):
# 回退到所有其他属性/方法
return getattr(self._backend, name)
def _scale_xy_from_args(self, args, kwargs):
"""从参数中缩放x,y坐标,支持多种坐标格式"""
# 处理 (x, y) 格式
if len(args) >= 2 and isinstance(args[0], (int, float)) and isinstance(args[1], (int, float)):
x = int(round(args[0] * self._scale_x))
y = int(round(args[1] * self._scale_y))
args = (x, y) + tuple(args[2:])
# 处理 ((x, y),) 格式
elif len(args) >= 1 and isinstance(args[0], (tuple, list)) and len(args[0]) == 2:
x_raw, y_raw = args[0]
if isinstance(x_raw, (int, float)) and isinstance(y_raw, (int, float)):
x = int(round(x_raw * self._scale_x))
y = int(round(y_raw * self._scale_y))
args = ((x, y),) + tuple(args[1:])
else:
# 处理kwargs格式
if 'x' in kwargs and 'y' in kwargs and isinstance(kwargs['x'], (int, float)) and isinstance(kwargs['y'], (int, float)):
kwargs = dict(kwargs)
kwargs['x'] = int(round(kwargs['x'] * self._scale_x))
kwargs['y'] = int(round(kwargs['y'] * self._scale_y))
return args, kwargs
# 包装的绝对坐标鼠标API - 自动坐标缩放
def moveTo(self, *args, **kwargs):
args, kwargs = self._scale_xy_from_args(args, kwargs)
return self._backend.moveTo(*args, **kwargs)
def click(self, *args, **kwargs):
args, kwargs = self._scale_xy_from_args(args, kwargs)
return self._backend.click(*args, **kwargs)
def doubleClick(self, *args, **kwargs):
args, kwargs = self._scale_xy_from_args(args, kwargs)
return self._backend.doubleClick(*args, **kwargs)
def rightClick(self, *args, **kwargs):
args, kwargs = self._scale_xy_from_args(args, kwargs)
return self._backend.rightClick(*args, **kwargs)
def dragTo(self, *args, **kwargs):
args, kwargs = self._scale_xy_from_args(args, kwargs)
return self._backend.dragTo(*args, **kwargs)
def scroll(self, *args, **kwargs):
"""滚动操作也支持坐标缩放"""
args, kwargs = self._scale_xy_from_args(args, kwargs)
return self._backend.scroll(*args, **kwargs)
return _ScaledPyAutoGUI(pyautogui, self.scale_x, self.scale_y)
async def click(self, x: int, y: int, button: str = 'left') -> bool:
"""点击指定坐标"""
if not PYAUTOGUI_AVAILABLE:
return False
try:
# 缩放坐标
scaled_x, scaled_y = self._scale_coordinates(x, y)
if button == 'left':
pyautogui.click(scaled_x, scaled_y)
elif button == 'right':
pyautogui.rightClick(scaled_x, scaled_y)
elif button == 'middle':
pyautogui.middleClick(scaled_x, scaled_y)
else:
pyautogui.click(scaled_x, scaled_y)
logger.info(f"点击坐标: 逻辑({x}, {y}) -> 物理({scaled_x}, {scaled_y}), 按钮: {button}")
return True
except Exception as e:
logger.error(f"点击操作失败: {e}")
return False
async def type_text(self, text: str, interval: float = 0.1) -> bool:
"""输入文本"""
if not PYAUTOGUI_AVAILABLE:
return False
try:
pyautogui.typewrite(text, interval=interval)
logger.info(f"输入文本: {text}")
return True
except Exception as e:
logger.error(f"输入文本失败: {e}")
return False
async def press_key(self, key: str) -> bool:
"""按下指定按键"""
if not PYAUTOGUI_AVAILABLE:
return False
try:
pyautogui.press(key)
logger.info(f"按下按键: {key}")
return True
except Exception as e:
logger.error(f"按键操作失败: {e}")
return False
async def scroll(self, x: int, y: int, clicks: int) -> bool:
"""滚动鼠标"""
if not PYAUTOGUI_AVAILABLE:
return False
try:
# 缩放坐标
scaled_x, scaled_y = self._scale_coordinates(x, y)
pyautogui.scroll(clicks, scaled_x, scaled_y)
logger.info(f"滚动: 逻辑({x}, {y}) -> 物理({scaled_x}, {scaled_y}), 滚动量: {clicks}")
return True
except Exception as e:
logger.error(f"滚动操作失败: {e}")
return False
async def drag_to(self, start_x: int, start_y: int, end_x: int, end_y: int, duration: float = 1.0) -> bool:
"""拖拽操作"""
if not PYAUTOGUI_AVAILABLE:
return False
try:
# 缩放坐标
scaled_start_x, scaled_start_y = self._scale_coordinates(start_x, start_y)
scaled_end_x, scaled_end_y = self._scale_coordinates(end_x, end_y)
pyautogui.dragTo(scaled_end_x, scaled_end_y, duration=duration)
logger.info(f"拖拽: 逻辑({start_x}, {start_y})->({end_x}, {end_y}) -> 物理({scaled_start_x}, {scaled_start_y})->({scaled_end_x}, {scaled_end_y})")
return True
except Exception as e:
logger.error(f"拖拽操作失败: {e}")
return False
async def find_element_by_text(self, text: str, screenshot: Optional[bytes] = None) -> Optional[Tuple[int, int]]:
"""通过文本查找元素位置"""
# 这里可以集成OCR功能
# 暂时返回None,后续实现
logger.info(f"查找文本元素: {text}")
return None
async def find_element_by_image(self, image_path: str, screenshot: Optional[bytes] = None) -> Optional[Tuple[int, int]]:
"""通过图像查找元素位置"""
# 这里可以集成图像匹配功能
# 暂时返回None,后续实现
logger.info(f"查找图像元素: {image_path}")
return None
async def execute_instruction(self, instruction: str) -> Dict[str, Any]:
"""执行电脑控制指令"""
try:
logger.info(f"执行指令: {instruction}")
# 解析指令
action = self._parse_instruction(instruction)
if not action:
return {
"success": False,
"error": "无法解析指令",
"instruction": instruction
}
# 执行动作
result = await self._execute_action(action)
return {
"success": result,
"instruction": instruction,
"action": action
}
except Exception as e:
logger.error(f"执行指令失败: {e}")
return {
"success": False,
"error": str(e),
"instruction": instruction
}
async def run_instruction(self, instruction: str, max_iterations: int = 15) -> Dict[str, Any]:
"""执行自然语言指令"""
if not self.init_ok:
return {"success": False, "error": "电脑控制适配器未初始化"}
try:
obs = {}
traj = "任务:\n" + instruction
for iteration in range(max_iterations):
logger.info(f"执行迭代 {iteration + 1}/{max_iterations}")
# 获取屏幕截图
screenshot = await self.take_screenshot()
if not screenshot:
return {"success": False, "error": "无法获取屏幕截图"}
obs["screenshot"] = screenshot
# 这里应该调用AI模型来生成下一步动作
# 暂时使用简单的模拟逻辑
if iteration == 0:
# 模拟AI生成的代码
code = f"# 执行任务: {instruction}\nprint('任务开始执行')"
else:
code = "print('任务完成')"
logger.info(f"执行代码: {code}")
# 检查任务完成条件
if "完成" in code or "done" in code.lower():
logger.info("任务完成")
break
if "等待" in code or "wait" in code.lower():
await asyncio.sleep(3)
continue
if "下一步" in code or "next" in code.lower():
continue
# 执行代码(注入缩放后的pyautogui)
try:
exec_env = globals().copy()
if PYAUTOGUI_AVAILABLE and hasattr(self, 'scale_x') and hasattr(self, 'scale_y'):
scaled_pyautogui = self._create_scaled_pyautogui()
if scaled_pyautogui:
exec_env['pyautogui'] = scaled_pyautogui
exec(code, exec_env, exec_env)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"代码执行失败: {e}")
continue
# 更新任务轨迹
traj += f"\n\n迭代 {iteration + 1}:\n{code}"
return {
"success": True,
"message": "任务执行完成",
"iterations": iteration + 1,
"trajectory": traj
}
except Exception as e:
logger.error(f"任务执行失败: {e}")
return {"success": False, "error": str(e)}
def normalize_coordinates(self, x: float, y: float, screen_width: int = None, screen_height: int = None) -> Tuple[int, int]:
"""
坐标标准化
将任意坐标转换为0-1000范围的标准化坐标
"""
if screen_width is None:
screen_width = self.screen_width
if screen_height is None:
screen_height = self.screen_height
# 标准化到0-1000范围
normalized_x = int(round(x / screen_width * 1000))
normalized_y = int(round(y / screen_height * 1000))
# 确保在有效范围内
normalized_x = max(0, min(1000, normalized_x))
normalized_y = max(0, min(1000, normalized_y))
return normalized_x, normalized_y
def denormalize_coordinates(self, normalized_x: int, normalized_y: int,
screen_width: int = None, screen_height: int = None) -> Tuple[int, int]:
"""
反标准化坐标,将0-1000范围的坐标转换为实际像素坐标
坐标反标准化
"""
if screen_width is None:
screen_width = self.screen_width
if screen_height is None:
screen_height = self.screen_height
# 从标准化坐标转换为实际像素坐标
pixel_x = int(round(normalized_x / 1000.0 * screen_width))
pixel_y = int(round(normalized_y / 1000.0 * screen_height))
return pixel_x, pixel_y
async def click_with_ai_location(self, target_description: str, button: str = 'left') -> bool:
"""
使用AI定位进行点击
支持自然语言描述目标元素
"""
try:
# 获取屏幕截图
screenshot = await self.take_screenshot()
if not screenshot:
logger.error("无法获取屏幕截图")
return False
# 使用AI定位元素
from .visual_analyzer import VisualAnalyzer
analyzer = VisualAnalyzer()
location = await analyzer.locate_element_with_ai(
target_description,
screenshot,
self.screen_width,
self.screen_height
)
if location:
x, y = location
return await self.click(x, y, button)
else:
logger.error(f"AI定位失败: {target_description}")
return False
except Exception as e:
logger.error(f"AI定位点击失败: {e}")
return False
def get_coordinate_info(self) -> Dict[str, Any]:
"""获取坐标系统信息"""
return {
"screen_size": f"{self.screen_width}x{self.screen_height}",
"scaled_size": f"{self.scaled_width}x{self.scaled_height}",
"scale_factors": f"x={self.scale_x:.2f}, y={self.scale_y:.2f}",
"normalization_range": "0-1000",
"platform": platform.system()
}
def _parse_instruction(self, instruction: str) -> Optional[Dict[str, Any]]:
"""解析指令"""
instruction = instruction.lower().strip()
# 简单的指令解析
if "点击" in instruction or "click" in instruction:
return {"action": "click", "instruction": instruction}
elif "输入" in instruction or "type" in instruction:
return {"action": "type", "instruction": instruction}
elif "截图" in instruction or "screenshot" in instruction:
return {"action": "screenshot", "instruction": instruction}
elif "滚动" in instruction or "scroll" in instruction:
return {"action": "scroll", "instruction": instruction}
else:
return {"action": "unknown", "instruction": instruction}
async def _execute_action(self, action: Dict[str, Any]) -> bool:
"""执行具体动作"""
action_type = action.get("action")
instruction = action.get("instruction")
if action_type == "click":
# 这里需要更智能的坐标定位
# 暂时使用屏幕中心
x, y = self.screen_width // 2, self.screen_height // 2
return await self.click(x, y)
elif action_type == "type":
# 这里需要提取要输入的文本
# 暂时使用示例文本
return await self.type_text("Hello World")
elif action_type == "screenshot":
screenshot = await self.take_screenshot()
return screenshot is not None
elif action_type == "scroll":
# 这里需要更智能的滚动参数
return await self.scroll(self.screen_width // 2, self.screen_height // 2, 3)
else:
logger.warning(f"未知动作类型: {action_type}")
return False
visual_analyzer.py
"""
视觉分析器 - 基于博弈论的视觉识别功能
提供OCR、图像匹配、元素定位、AI坐标定位等功能
AI坐标定位算法升级
"""
import logging # 记录日志的,像写日记一样记录程序运行情况
from typing import Dict, Any, Optional, Tuple, List # 这是Python的类型提示,告诉你函数返回什么类型
import sys # 系统相关的功能
# 下面这两行是为了防止导入冲突
if 'nagaagent_core.vendors.pil' in sys.modules:
del sys.modules['nagaagent_core.vendors.pil']
from PIL import Image # 处理图片的主要工具
import io # 处理二进制数据(比如图片数据)
import base64 # 把图片转换成文本格式
import re # 正则表达式,处理文本模式匹配
import json # 处理JSON数据格式
# 配置日志
logger = logging.getLogger(__name__)
class VisualAnalyzer:
"""视觉分析器"""
def __init__(self):
"""初始化视觉分析器"""
self.ocr_available = False # OCR功能是否可用
self.image_matching_available = False # 图像匹配功能是否可用
self.ai_coordinate_available = False # AI定位功能是否可用
# 尝试导入OCR库
try:
import nagaagent_core.vendors.pytesseract as pytesseract
self.ocr_available = True
logger.info("OCR功能已启用")
except ImportError:
logger.warning("pytesseract未安装,OCR功能不可用")
# 尝试加载各个工具
# 尝试导入图像匹配库
try:
import nagaagent_core.vendors.cv2 as cv2
import numpy as np
self.cv2 = cv2
self.np = np
self.image_matching_available = True
logger.info("图像匹配功能已启用")
except ImportError:
logger.warning("opencv-python未安装,图像匹配功能不可用")
# 尝试导入AI坐标定位库
try:
from langchain_openai import ChatOpenAI
self.ai_coordinate_available = True
logger.info("AI坐标定位功能已启用")
except ImportError:
logger.warning("langchain-openai未安装,AI坐标定位功能不可用")
async def analyze_screenshot(self, screenshot: bytes) -> Dict[str, Any]: #分析屏幕截图(主函数)
"""分析屏幕截图"""
try:
# 加载图像
image = Image.open(io.BytesIO(screenshot))
analysis_result = {
"image_size": image.size,
"mode": image.mode,
"ocr_text": [],
"elements": [],
"analysis_time": None
}
# OCR文本识别
if self.ocr_available:
ocr_result = await self._extract_text_from_image(image)
analysis_result["ocr_text"] = ocr_result
# 元素检测
elements = await self._detect_elements(image)
analysis_result["elements"] = elements
logger.info(f"屏幕分析完成: 识别到 {len(analysis_result['ocr_text'])} 个文本, {len(elements)} 个元素")
return analysis_result
except Exception as e:
logger.error(f"屏幕分析失败: {e}")
return {
"error": str(e),
"image_size": None,
"ocr_text": [],
"elements": []
}
async def _extract_text_from_image(self, image: Image.Image) -> List[Dict[str, Any]]: #从图片提取文字
"""从图像中提取文本"""
if not self.ocr_available:
return []
try:
import nagaagent_core.vendors.pytesseract as pytesseract
# 使用OCR提取文本和位置信息
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
text_elements = []
for i in range(len(data['text'])):
text = data['text'][i].strip()
if text: # 只处理非空文本
text_elements.append({
"text": text,
"confidence": data['conf'][i],
"bbox": {
"x": data['left'][i],
"y": data['top'][i],
"width": data['width'][i],
"height": data['height'][i]
}
})
return text_elements
except Exception as e:
logger.error(f"OCR文本提取失败: {e}")
return []
async def _detect_elements(self, image: Image.Image) -> List[Dict[str, Any]]: #检测图像中的元素
"""检测图像中的元素"""
if not self.image_matching_available:
return []
try:
# 转换为OpenCV格式
cv_image = self.cv2.cvtColor(self.np.array(image), self.cv2.COLOR_RGB2BGR)
# 检测边缘
gray = self.cv2.cvtColor(cv_image, self.cv2.COLOR_BGR2GRAY)
edges = self.cv2.Canny(gray, 50, 150)
# 查找轮廓
contours, _ = self.cv2.findContours(edges, self.cv2.RETR_EXTERNAL, self.cv2.CHAIN_APPROX_SIMPLE)
elements = []
for contour in contours:
# 计算边界框
x, y, w, h = self.cv2.boundingRect(contour)
# 过滤太小的元素
if w > 10 and h > 10:
elements.append({
"type": "contour",
"bbox": {"x": x, "y": y, "width": w, "height": h},
"area": w * h
})
return elements
except Exception as e:
logger.error(f"元素检测失败: {e}")
return []
async def find_text_element(self, screenshot: bytes, target_text: str) -> Optional[Tuple[int, int]]:
"""查找包含指定文本的元素位置"""
try:
analysis = await self.analyze_screenshot(screenshot)
for text_element in analysis.get("ocr_text", []):
if target_text.lower() in text_element["text"].lower():
bbox = text_element["bbox"]
# 返回中心坐标
center_x = bbox["x"] + bbox["width"] // 2
center_y = bbox["y"] + bbox["height"] // 2
return (center_x, center_y)
return None
except Exception as e:
logger.error(f"查找文本元素失败: {e}")
return None
async def find_image_element(self, screenshot: bytes, template_path: str) -> Optional[Tuple[int, int]]:
"""查找图像模板匹配的元素位置"""
if not self.image_matching_available:
return None
try:
# 加载模板图像
template = self.cv2.imread(template_path)
if template is None:
logger.error(f"无法加载模板图像: {template_path}")
return None
# 加载屏幕截图
screen_image = Image.open(io.BytesIO(screenshot))
screen_cv = self.cv2.cvtColor(self.np.array(screen_image), self.cv2.COLOR_RGB2BGR)
# 模板匹配
result = self.cv2.matchTemplate(screen_cv, template, self.cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = self.cv2.minMaxLoc(result)
# 如果匹配度足够高
if max_val > 0.8:
# 返回模板中心位置
center_x = max_loc[0] + template.shape[1] // 2
center_y = max_loc[1] + template.shape[0] // 2
return (center_x, center_y)
return None
except Exception as e:
logger.error(f"图像匹配失败: {e}")
return None
async def get_screen_info(self, screenshot: bytes) -> Dict[str, Any]:
"""获取屏幕信息"""
try:
image = Image.open(io.BytesIO(screenshot))
return {
"width": image.width,
"height": image.height,
"mode": image.mode,
"format": image.format,
"size_bytes": len(screenshot)
}
except Exception as e:
logger.error(f"获取屏幕信息失败: {e}")
return {
"error": str(e),
"width": 0,
"height": 0
}
async def locate_element_with_ai(self, target_description: str, screenshot: bytes,
screen_width: int = 1920, screen_height: int = 1080) -> Optional[Tuple[int, int]]:
"""
使用AI定位屏幕元素
支持自然语言描述定位界面元素
"""
if not self.ai_coordinate_available:
logger.warning("AI坐标定位功能不可用")
return None
try:
from langchain_openai import ChatOpenAI
# 统一从系统配置读取视觉LLM参数
from system.config import config
cc = getattr(config, 'computer_control', None)
model = getattr(cc, 'model', None) or config.api.model
base_url = getattr(cc, 'model_url', None) or config.api.base_url
api_key = getattr(cc, 'api_key', None) or config.api.api_key
# 初始化LLM
llm = ChatOpenAI(
model=model,
base_url=base_url,
api_key=api_key,
temperature=0
)
# 将截图转换为base64
screenshot_b64 = base64.b64encode(screenshot).decode('utf-8')
# 构建AI定位提示词
prompt = f"""
请分析屏幕截图并定位目标元素: "{target_description}"
请按以下格式输出坐标:
1. 如果可能,输出边界框 [x1, y1, x2, y2],坐标范围0-1000
2. 如果边界框不合适,输出精确坐标 x,y
3. 不要包含任何解释文字,只输出坐标
屏幕尺寸: {screen_width}x{screen_height}
"""
# 调用AI模型进行坐标定位
response = llm.invoke([
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{screenshot_b64}"}}
]
}
])
# 解析AI返回的坐标
coordinates = self._parse_ai_coordinates(response.content, screen_width, screen_height)
if coordinates:
logger.info(f"AI定位成功: {target_description} -> {coordinates}")
return coordinates
else:
logger.warning(f"AI定位失败: {target_description}")
return None
except Exception as e:
logger.error(f"AI坐标定位失败: {e}")
return None
def _parse_ai_coordinates(self, response: str, screen_width: int, screen_height: int) -> Optional[Tuple[int, int]]:
"""
解析AI返回的坐标
支持边界框和精确坐标两种格式
"""
try:
# 清理响应文本
response = response.strip()
# 尝试解析边界框格式 [x1, y1, x2, y2]
bbox_match = re.search(r'\[([0-9.,\s]+)\]', response)
if bbox_match:
coords_str = bbox_match.group(1)
numbers = re.findall(r'-?\d+\.?\d*', coords_str)
if len(numbers) >= 4:
x1, y1, x2, y2 = [float(n) for n in numbers[:4]]
# 检查是否在0-1000范围内(标准化坐标)
if max(x1, y1, x2, y2) <= 1000:
# 计算中心点并转换为实际像素坐标
center_x = (x1 + x2) / 2.0
center_y = (y1 + y2) / 2.0
pixel_x = int(round(center_x / 1000.0 * screen_width))
pixel_y = int(round(center_y / 1000.0 * screen_height))
return (pixel_x, pixel_y)
else:
# 直接使用像素坐标
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
return (int(center_x), int(center_y))
# 尝试解析精确坐标格式 x,y
coord_match = re.search(r'(\d+\.?\d*)\s*,\s*(\d+\.?\d*)', response)
if coord_match:
x, y = float(coord_match.group(1)), float(coord_match.group(2))
# 检查是否在0-1000范围内(标准化坐标)
if x <= 1000 and y <= 1000:
pixel_x = int(round(x / 1000.0 * screen_width))
pixel_y = int(round(y / 1000.0 * screen_height))
return (pixel_x, pixel_y)
else:
# 直接使用像素坐标
return (int(x), int(y))
# 尝试解析数字列表
numbers = [int(float(x)) for x in re.findall(r'-?\d+\.?\d*', response)]
if len(numbers) >= 2:
x, y = numbers[0], numbers[1]
if x <= 1000 and y <= 1000:
pixel_x = int(round(x / 1000.0 * screen_width))
pixel_y = int(round(y / 1000.0 * screen_height))
return (pixel_x, pixel_y)
else:
return (x, y)
return None
except Exception as e:
logger.error(f"坐标解析失败: {e}")
return None
async def locate_element(self, target: str, screenshot: bytes) -> Optional[Tuple[int, int]]:
"""
智能元素定位,优先使用AI定位,回退到传统方法
多层次定位策略
"""
try:
# 首先尝试AI定位
if self.ai_coordinate_available:
ai_result = await self.locate_element_with_ai(target, screenshot)
if ai_result:
return ai_result
# 回退到文本定位
if self.ocr_available:
text_result = await self.find_text_element(screenshot, target)
if text_result:
return text_result
# 回退到图像匹配(如果target是图像路径)
if self.image_matching_available and target.endswith(('.png', '.jpg', '.jpeg')):
image_result = await self.find_image_element(screenshot, target)
if image_result:
return image_result
logger.warning(f"无法定位元素: {target}")
return None
except Exception as e:
logger.error(f"元素定位失败: {e}")
return None
def is_available(self) -> Dict[str, Any]:
"""检查视觉分析器可用性"""
return {
"ocr_available": self.ocr_available,
"image_matching_available": self.image_matching_available,
"ai_coordinate_available": self.ai_coordinate_available,
"ready": self.ocr_available or self.image_matching_available or self.ai_coordinate_available
}
# ==========代码里逻辑判断======
#这个代码里面的功能其实都是十分相似的,判断逻辑也十分相同,所以我们重点讲一下一些这个代码里面,处处可见的,重复出现的东西就行了。
'''
1. 错误处理逻辑(处处可见)
try:
# 尝试做某件事
do_something()
except Exception as e: # 如果出错了
logger.error(f"出错了: {e}") # 记录错误
return [] # 返回空结果,不让程序崩溃
为什么这样设计?
不能让一个功能出错导致整个程序崩溃
比如OCR功能坏了,图像匹配还能用
2. 功能可用性检查
def is_available(self) -> Dict[str, Any]:
return {
"ocr_available": self.ocr_available,
"image_matching_available": self.image_matching_available,
"ai_coordinate_available": self.ai_coordinate_available,
"ready": self.ocr_available or self.image_matching_available or self.ai_coordinate_available
}
这个函数告诉用户:我现在有哪些工具可用
3. AI坐标解析的智能判断
def _parse_ai_coordinates(self, response: str, screen_width: int, screen_height: int):
AI可能返回两种格式:
[x1, y1, x2, y2] - 一个框的四个角
x,y - 一个点的坐标
程序要能识别出是哪种格式并正确处理
'''
#这个文件的整体逻辑流程图
'''
开始使用视觉分析器
↓
初始化工具箱
├─ 检查OCR工具 ✓
├─ 检查图像匹配工具 ✓
└─ 检查AI工具 ✓
↓
用户想找东西时:
↓
优先用AI定位(如果可用)
↓
如果AI找不到或不可用 → 用文字识别
↓
如果文字找不到 → 用图像匹配
↓
返回找到的位置 或 返回“找不到”
'''


