agentserver\agent_computer_control
本文最后更新于62 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com

computer_control_agent

"""
电脑控制主Agent - 协调各个组件,提供统一的电脑控制接口
"""

import asyncio
import logging
from typing import Dict, Any, Optional
import json #用于把数据转换成JSON格式(网络传输常用)

from .computer_use_adapter import ComputerUseAdapter
from .visual_analyzer import VisualAnalyzer
from .action_executor import ActionExecutor, ActionResult
'''
computer_use_adapter.py(电脑控制适配器)
action_executor.py(动作执行器)
visual_analyzer.py(视觉分析器 - 我们还没看)
'''

# 配置日志
logger = logging.getLogger(__name__)

class ComputerControlAgent:
    """电脑控制主Agent,负责协调各个组件"""
    
    def __init__(self):
        """初始化电脑控制Agent"""
        self.adapter = ComputerUseAdapter()   # 控制专家,adapter:负责具体的鼠标键盘控制
        self.analyzer = VisualAnalyzer()      # 视觉专家,analyzer:负责分析屏幕内容
        self.executor = ActionExecutor(       # 执行专家,executor:负责协调执行动作
            computer_adapter=self.adapter,
            visual_analyzer=self.analyzer
        )
        '''
        执行器 (executor)
           ↓
           ├─→ 适配器 (adapter):控制硬件
           └─→ 分析器 (analyzer):分析屏幕
        '''
        
        logger.info("电脑控制Agent初始化完成")
    
    async def handle_handoff(self, task: dict) -> str:
        """处理电脑控制任务"""
        try:
            logger.info(f"收到电脑控制任务: {task}")
            
            # 解析任务参数
            action = task.get("action", "")
            target = task.get("target", "")
            parameters = task.get("parameters", {})
            
            # 根据动作类型处理
            if action == "click":
                return await self._handle_click(target, parameters)
            elif action == "click_ai":
                return await self._handle_click_ai(target, parameters)
            elif action == "type":
                return await self._handle_type(target, parameters)
            elif action == "screenshot":
                return await self._handle_screenshot(target, parameters)
            elif action == "find_element":
                return await self._handle_find_element(target, parameters)
            elif action == "locate_ai":
                return await self._handle_locate_ai(target, parameters)
            elif action == "automate_task":
                return await self._handle_automate_task(target, parameters)
            elif action == "coordinate_info":
                return await self._handle_coordinate_info(target, parameters)
            else:
                return await self._handle_generic_task(action, target, parameters)
            '''
            这是整个系统的"大门":
              1.接收一个任务(字典格式)
              2.解析出三个部分:
                 action:要做什么(点击、打字等)
                 target:目标是什么
                 parameters:额外参数
              3.根据动作类型,分发给专门的处理函数
            '''
                
        except Exception as e:
            logger.error(f"处理电脑控制任务失败: {e}")
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"任务处理失败: {str(e)}"
            }, ensure_ascii=False)
    
    # 点击处理函数(两种点击方式)
    async def _handle_click(self, target: str, parameters: Dict[str, Any]) -> str:
        """处理点击任务,支持AI定位和多种坐标格式"""
        try:
            # 检查是否使用AI定位
            use_ai_location = parameters.get("use_ai", False)
            ai_description = parameters.get("ai_description", target)
            
            #特点:告诉AI"点击保存按钮",AI自己找按钮位置
            if use_ai_location and ai_description:
                # 使用AI定位进行点击
                success = await self.adapter.click_with_ai_location(ai_description, parameters.get("button", "left"))
                
                if success:
                    return json.dumps({
                        "success": True,
                        "message": f"AI定位点击成功: {ai_description}",
                        "data": {"method": "ai_location", "target": ai_description}
                    }, ensure_ascii=False)
                else:
                    return json.dumps({
                        "success": False,
                        "error": "AI定位失败",
                        "message": f"AI定位点击失败: {ai_description}"
                    }, ensure_ascii=False)
            else:
                # 使用传统方式执行点击,特点:指定具体坐标位置进行点击
                action = {
                    "action": "click",
                    "target": target,
                    "parameters": parameters
                }
                
                # 执行点击
                result = await self.executor.execute_action(action)
                
                if result.success:
                    return json.dumps({
                        "success": True,
                        "message": f"成功点击: {target}",
                        "data": result.data
                    }, ensure_ascii=False)
                else:
                    return json.dumps({
                        "success": False,             # 是否成功
                        "error": result.error,        # 结果信息
                        "message": result.message     # 额外数据
                    }, ensure_ascii=False)
                
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"点击操作失败: {str(e)}"
            }, ensure_ascii=False)
    
    async def _handle_type(self, target: str, parameters: Dict[str, Any]) -> str:
        """处理输入任务"""
        try:
            # 构建输入动作
            action = {
                "action": "type",
                "target": target,
                "parameters": parameters
            }
            
            # 执行输入
            result = await self.executor.execute_action(action)
            
            if result.success:
                return json.dumps({
                    "success": True,
                    "message": f"成功输入: {target}",
                    "data": result.data
                }, ensure_ascii=False)
            else:
                return json.dumps({
                    "success": False,
                    "error": result.error,
                    "message": result.message
                }, ensure_ascii=False)
                
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"输入操作失败: {str(e)}"
            }, ensure_ascii=False)
    
    async def _handle_screenshot(self, target: str, parameters: Dict[str, Any]) -> str:
        """处理截图任务"""
        try:
            # 构建截图动作
            action = {
                "action": "screenshot",
                "target": target,
                "parameters": parameters
            }
            
            # 执行截图
            result = await self.executor.execute_action(action)
            
            if result.success:
                return json.dumps({
                    "success": True,
                    "message": f"截图成功: {target}",
                    "data": result.data
                }, ensure_ascii=False)
            else:
                return json.dumps({
                    "success": False,
                    "error": result.error,
                    "message": result.message
                }, ensure_ascii=False)
                
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"截图操作失败: {str(e)}"
            }, ensure_ascii=False)
    
    async def _handle_find_element(self, target: str, parameters: Dict[str, Any]) -> str:
        """处理元素查找任务"""
        try:
            # 构建查找动作
            action = {
                "action": "find_element",
                "target": target,
                "parameters": parameters
            }
            
            # 执行查找
            result = await self.executor.execute_action(action)
            
            if result.success:
                return json.dumps({
                    "success": True,
                    "message": f"找到元素: {target}",
                    "data": result.data
                }, ensure_ascii=False)
            else:
                return json.dumps({
                    "success": False,
                    "error": result.error,
                    "message": result.message
                }, ensure_ascii=False)
                
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"元素查找失败: {str(e)}"
            }, ensure_ascii=False)
    
    async def _handle_automate_task(self, target: str, parameters: Dict[str, Any]) -> str:
        """处理自动化任务"""
        try:
            logger.info(f"开始自动化任务: {target}")
            # 直接调用电脑控制适配器执行自然语言指令
            exec_result = await self.adapter.run_instruction(target)
            success = bool(exec_result.get("success")) if isinstance(exec_result, dict) else False
            return json.dumps({
                "success": success,
                "message": "任务执行完成" if success else exec_result.get("error", "任务执行失败"),
                "data": exec_result
            }, ensure_ascii=False)
            
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"自动化任务失败: {str(e)}"
            }, ensure_ascii=False)
    
    async def _handle_generic_task(self, action: str, target: str, parameters: Dict[str, Any]) -> str:
        """处理通用任务"""
        try:
            # 构建通用动作
            action_dict = {
                "action": action,
                "target": target,
                "parameters": parameters
            }
            
            # 执行动作
            result = await self.executor.execute_action(action_dict)
            
            if result.success:
                return json.dumps({
                    "success": True,
                    "message": f"任务执行成功: {action} - {target}",
                    "data": result.data
                }, ensure_ascii=False)
            else:
                return json.dumps({
                    "success": False,
                    "error": result.error,
                    "message": result.message
                }, ensure_ascii=False)
                
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"通用任务执行失败: {str(e)}"
            }, ensure_ascii=False)
    
    def get_capabilities(self) -> Dict[str, Any]:
        """获取电脑控制能力"""
        return {
            "enabled": True,
            "adapter": self.adapter.is_available(),
            "analyzer": self.analyzer.is_available(),
            "planner": self.planner.is_available(),
            "executor": self.executor.is_available(),
            "capabilities": [
                "鼠标点击",
                "键盘输入",
                "屏幕截图",
                "元素查找",
                "任务自动化",
                "视觉分析"
            ]
        }
    
    async def _handle_click_ai(self, target: str, parameters: Dict[str, Any]) -> str:
        """处理AI定位点击任务"""
        try:
            # 使用AI定位进行点击
            success = await self.adapter.click_with_ai_location(target, parameters.get("button", "left"))
            
            if success:
                return json.dumps({
                    "success": True,
                    "message": f"AI定位点击成功: {target}",
                    "data": {"method": "ai_location", "target": target}
                }, ensure_ascii=False)
            else:
                return json.dumps({
                    "success": False,
                    "error": "AI定位失败",
                    "message": f"AI定位点击失败: {target}"
                }, ensure_ascii=False)
                
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"AI定位点击失败: {str(e)}"
            }, ensure_ascii=False)
    
    async def _handle_locate_ai(self, target: str, parameters: Dict[str, Any]) -> str:
        """处理AI定位任务"""
        try:
            # 获取屏幕截图
            screenshot = await self.adapter.take_screenshot()
            if not screenshot:
                return json.dumps({
                    "success": False,
                    "error": "截图失败",
                    "message": "无法获取屏幕截图"
                }, ensure_ascii=False)
            
            # 使用AI定位元素
            location = await self.analyzer.locate_element_with_ai(
                target, 
                screenshot, 
                self.adapter.screen_width, 
                self.adapter.screen_height
            )
            
            if location:
                x, y = location
                return json.dumps({
                    "success": True,
                    "message": f"AI定位成功: {target}",
                    "data": {
                        "target": target,
                        "coordinates": {"x": x, "y": y},
                        "method": "ai_location"
                    }
                }, ensure_ascii=False)
            else:
                return json.dumps({
                    "success": False,
                    "error": "AI定位失败",
                    "message": f"无法定位元素: {target}"
                }, ensure_ascii=False)
                
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"AI定位失败: {str(e)}"
            }, ensure_ascii=False)
    
    async def _handle_coordinate_info(self, target: str, parameters: Dict[str, Any]) -> str:
        """处理坐标系统信息查询"""
        try:
            # 获取坐标系统信息
            coordinate_info = self.adapter.get_coordinate_info()
            
            return json.dumps({
                "success": True,
                "message": "坐标系统信息获取成功",
                "data": coordinate_info
            }, ensure_ascii=False)
            
        except Exception as e:
            return json.dumps({
                "success": False,
                "error": str(e),
                "message": f"获取坐标系统信息失败: {str(e)}"
            }, ensure_ascii=False)
    
    def get_status(self) -> Dict[str, Any]:
        """获取电脑控制状态"""
        return {
            "agent_name": "ComputerControlAgent",
            "version": "2.0.0",  # 升级版本号
            "status": "running",
            "capabilities": self.get_capabilities(),
            "components": {
                "adapter": "ComputerUseAdapter",
                "analyzer": "VisualAnalyzer", 
                "planner": "TaskPlanner",
                "executor": "ActionExecutor"
            },
            "upgrades": [
                "AI坐标定位",
                "坐标标准化系统",
                "多格式坐标支持",
                "智能元素定位"
            ]
        }

# ==========这个文件创造了什么?==========
'''
 新的类:ComputerControlAgent(总指挥)
9个处理函数:
handle_handoff(主入口)
_handle_click(点击)
_handle_click_ai(AI点击)
_handle_type(打字)
_handle_screenshot(截图)
_handle_find_element(查找元素)
_handle_automate_task(自动化任务)
_handle_locate_ai(AI定位)
_handle_coordinate_info(坐标信息)
_handle_generic_task(通用任务)
2个状态函数:
get_capabilities(能力查询)
get_status(状态查询)
它调用了哪些外部函数?
adapter.click_with_ai_location():AI点击
adapter.take_screenshot():截图
adapter.run_instruction():执行自然语言指令
adapter.get_coordinate_info():获取坐标信息
analyzer.locate_element_with_ai():AI定位元素
executor.execute_action():执行动作
它做了哪些配置?
日志配置:记录运行过程
团队配置:组建了三个专家
返回格式配置:统一使用JSON格式
'''
#这个文件是整个系统的总入口

action_executor.py

"""
动作执行器 - 执行具体的鼠标键盘操作
提供安全、可靠的动作执行功能
"""

import asyncio           # 异步工具:让多个任务可以同时进行
import logging           # 日志工具:记录程序运行过程
from typing import Dict, Any, Optional, Tuple, List  # 类型提示:让代码更清晰
from dataclasses import dataclass        # 数据类工具:简化类的创建
from enum import Enum                    # 枚举工具:定义固定选项

# 配置日志
logger = logging.getLogger(__name__)

class ActionType(Enum):
    """动作类型枚举"""
    CLICK = "click"   # 点击
    TYPE = "type"      # 打字
    SCREENSHOT = "screenshot"   # 截图
    SCROLL = "scroll"      # 滚动
    DRAG = "drag"          # 拖拽
    WAIT = "wait"           # 等待
    FIND_ELEMENT = "find_element"    # 查找元素
    ANALYZE = "analyze"         # 分析

@dataclass
class ActionResult:
    """数据类-动作执行结果"""
    success: bool         # 是否成功
    message: str          # 结果信息
    data: Optional[Dict[str, Any]] = None  # 额外数据
    error: Optional[str] = None    # 错误信息

class ActionExecutor:
    """动作执行器,执行具体的鼠标键盘操作"""
    
    def __init__(self, computer_adapter=None, visual_analyzer=None):
        """初始化动作执行器"""
        self.computer_adapter = computer_adapter   # 控制电脑的接口
        self.visual_analyzer = visual_analyzer     # 分析屏幕的工具
        self.execution_history = []                # 执行历史记录
        self.safety_mode = True                    # 安全模式开关
        self.max_retry_count = 3                   # 最多重试3次
    
    async def execute_action(self, action: Dict[str, Any]) -> ActionResult:
        """执行具体动作"""  
        #这个函数是“总指挥”,它:接收一个动作指令(字典),检查是什么类型的动作,检查是否安全,根据动作类型调用不同的执行函数,记录执行结果
        try:
            action_type = action.get("action", "").lower()
            target = action.get("target", "")
            parameters = action.get("parameters", {})
            
            logger.info(f"执行动作: {action_type}, 目标: {target}")
            
            # 安全检查
            if self.safety_mode and not self._is_safe_action(action_type, target):  # 如果开了安全模式且动作不安全,就停止执行
                return ActionResult(
                    success=False,
                    message="动作被安全模式阻止",
                    error="不安全动作"
                )
            
            # 根据动作类型执行
            if action_type == ActionType.CLICK.value:
                return await self._execute_click(target, parameters)
            elif action_type == ActionType.TYPE.value:
                return await self._execute_type(target, parameters)
            elif action_type == ActionType.SCREENSHOT.value:
                return await self._execute_screenshot(target, parameters)
            elif action_type == ActionType.SCROLL.value:
                return await self._execute_scroll(target, parameters)
            elif action_type == ActionType.DRAG.value:
                return await self._execute_drag(target, parameters)
            elif action_type == ActionType.WAIT.value:
                return await self._execute_wait(target, parameters)
            elif action_type == ActionType.FIND_ELEMENT.value:
                return await self._execute_find_element(target, parameters)
            elif action_type == ActionType.ANALYZE.value:
                return await self._execute_analyze(target, parameters)
            else:
                return ActionResult(
                    success=False,
                    message=f"未知动作类型: {action_type}",
                    error="未知动作类型"
                )
                
        except Exception as e:
            logger.error(f"动作执行失败: {e}")
            return ActionResult(
                success=False,
                message=f"动作执行异常: {str(e)}",
                error=str(e)
            )
        finally:
            # 记录执行历史
            self.execution_history.append({
                "action": action,
                "timestamp": asyncio.get_event_loop().time()
            })
    
    async def _execute_click(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
        """执行点击动作,支持多种坐标格式和AI定位"""
        try:     #检查是否有电脑控制工具
            if not self.computer_adapter:
                return ActionResult(
                    success=False,
                    message="电脑控制适配器未初始化",
                    error="适配器未初始化"
                )
            
            # 获取坐标,支持多种格式
            x = parameters.get("x")
            y = parameters.get("y")
            button = parameters.get("button", "left")
            
            # 处理坐标格式
            if x is not None and y is not None:    
                # 直接坐标
                x, y = self._parse_coordinates(x, y)
            elif isinstance(target, str) and (target.startswith(('点击', 'click', 'Click')) or 
                                           any(keyword in target.lower() for keyword in ['按钮', 'button', '图标', 'icon'])):
                # 尝试AI定位,如果没有坐标,就用AI来寻找
                if self.visual_analyzer:
                    screenshot = await self.computer_adapter.take_screenshot()
                    location = await self.visual_analyzer.locate_element(target, screenshot)
                    if location:
                        x, y = location
                    else:
                        return ActionResult(
                            success=False,
                            message=f"AI定位失败: {target}",
                            error="AI定位失败"
                        )
                else:
                    return ActionResult(
                        success=False,
                        message="需要坐标或视觉分析器",
                        error="缺少必要参数"
                    )
            else:
                return ActionResult(
                    success=False,
                    message="缺少坐标参数",
                    error="缺少坐标参数"
                )
            
            # 执行点击
            success = await self.computer_adapter.click(x, y, button)
            
            if success:
                return ActionResult(
                    success=True,
                    message=f"成功点击: ({x}, {y})",
                    data={"x": x, "y": y, "button": button, "target": target}
                )
            else:
                return ActionResult(
                    success=False,
                    message="点击操作失败",
                    error="点击失败"
                )
                
        except Exception as e:
            return ActionResult(
                success=False,
                message=f"点击执行失败: {str(e)}",
                error=str(e)
            )
    
    def _parse_coordinates(self, x, y) -> Tuple[int, int]:
        """解析坐标,支持多种格式"""
        try:
            # 处理字符串坐标
            if isinstance(x, str):
                x = float(x)
            if isinstance(y, str):
                y = float(y)
            
            # 处理元组/列表坐标
            if isinstance(x, (tuple, list)) and len(x) == 2:
                x, y = x[0], x[1]
            
            # 转换为整数
            x = int(round(float(x)))
            y = int(round(float(y)))
            
            return x, y
            
        except Exception as e:
            logger.error(f"坐标解析失败: {e}")
            return 0, 0
    
    async def _execute_type(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
        """执行输入动作"""
        try:
            if not self.computer_adapter:
                return ActionResult(
                    success=False,
                    message="电脑控制适配器未初始化",
                    error="适配器未初始化"
                )
            
            text = parameters.get("text", target)
            interval = parameters.get("interval", 0.1)
            
            # 执行输入
            success = await self.computer_adapter.type_text(text, interval)
            
            if success:
                return ActionResult(
                    success=True,
                    message=f"成功输入文本: {text}",
                    data={"text": text, "interval": interval}
                )
            else:
                return ActionResult(
                    success=False,
                    message="文本输入失败",
                    error="输入失败"
                )
                
        except Exception as e:
            return ActionResult(
                success=False,
                message=f"输入执行失败: {str(e)}",
                error=str(e)
            )
    
    async def _execute_screenshot(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
        """执行截图动作"""
        try:
            if not self.computer_adapter:
                return ActionResult(
                    success=False,
                    message="电脑控制适配器未初始化",
                    error="适配器未初始化"
                )
            
            # 执行截图
            screenshot = await self.computer_adapter.take_screenshot()
            
            if screenshot:
                return ActionResult(
                    success=True,
                    message="截图成功",
                    data={"screenshot_size": len(screenshot)}
                )
            else:
                return ActionResult(
                    success=False,
                    message="截图失败",
                    error="截图失败"
                )
                
        except Exception as e:
            return ActionResult(
                success=False,
                message=f"截图执行失败: {str(e)}",
                error=str(e)
            )
    
    async def _execute_scroll(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
        """执行滚动动作"""
        try:
            if not self.computer_adapter:
                return ActionResult(
                    success=False,
                    message="电脑控制适配器未初始化",
                    error="适配器未初始化"
                )
            
            x = parameters.get("x", 0)
            y = parameters.get("y", 0)
            clicks = parameters.get("clicks", 3)
            
            # 执行滚动
            success = await self.computer_adapter.scroll(x, y, clicks)
            
            if success:
                return ActionResult(
                    success=True,
                    message=f"成功滚动: ({x}, {y}), 滚动量: {clicks}",
                    data={"x": x, "y": y, "clicks": clicks}
                )
            else:
                return ActionResult(
                    success=False,
                    message="滚动操作失败",
                    error="滚动失败"
                )
                
        except Exception as e:
            return ActionResult(
                success=False,
                message=f"滚动执行失败: {str(e)}",
                error=str(e)
            )
    
    async def _execute_drag(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
        """执行拖拽动作"""
        try:
            if not self.computer_adapter:
                return ActionResult(
                    success=False,
                    message="电脑控制适配器未初始化",
                    error="适配器未初始化"
                )
            
            start_x = parameters.get("start_x", 0)
            start_y = parameters.get("start_y", 0)
            end_x = parameters.get("end_x", 100)
            end_y = parameters.get("end_y", 100)
            duration = parameters.get("duration", 1.0)
            
            # 执行拖拽
            success = await self.computer_adapter.drag_to(start_x, start_y, end_x, end_y, duration)
            
            if success:
                return ActionResult(
                    success=True,
                    message=f"成功拖拽: ({start_x}, {start_y}) -> ({end_x}, {end_y})",
                    data={"start": (start_x, start_y), "end": (end_x, end_y), "duration": duration}
                )
            else:
                return ActionResult(
                    success=False,
                    message="拖拽操作失败",
                    error="拖拽失败"
                )
                
        except Exception as e:
            return ActionResult(
                success=False,
                message=f"拖拽执行失败: {str(e)}",
                error=str(e)
            )
    
    async def _execute_wait(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
        """执行等待动作"""
        try:
            duration = parameters.get("duration", 1.0)
            
            # 执行等待
            await asyncio.sleep(duration)
            
            return ActionResult(
                success=True,
                message=f"等待完成: {duration}秒",
                data={"duration": duration}
            )
            
        except Exception as e:
            return ActionResult(
                success=False,
                message=f"等待执行失败: {str(e)}",
                error=str(e)
            )
    
    async def _execute_find_element(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
        """执行元素查找动作"""
        try:
            if not self.visual_analyzer:
                return ActionResult(
                    success=False,
                    message="视觉分析器未初始化",
                    error="分析器未初始化"
                )
            
            if not self.computer_adapter:
                return ActionResult(
                    success=False,
                    message="电脑控制适配器未初始化",
                    error="适配器未初始化"
                )
            
            # 先截图
            screenshot = await self.computer_adapter.take_screenshot()
            if not screenshot:
                return ActionResult(
                    success=False,
                    message="截图失败",
                    error="截图失败"
                )
            
            # 查找元素
            location = await self.visual_analyzer.locate_element(target, screenshot)
            
            if location:
                return ActionResult(
                    success=True,
                    message=f"找到元素: {target} at {location}",
                    data={"location": location, "target": target}
                )
            else:
                return ActionResult(
                    success=False,
                    message=f"未找到元素: {target}",
                    error="元素未找到"
                )
                
        except Exception as e:
            return ActionResult(
                success=False,
                message=f"元素查找失败: {str(e)}",
                error=str(e)
            )
    
    #辅助函数
    async def _execute_analyze(self, target: str, parameters: Dict[str, Any]) -> ActionResult:
        """坐标解析函数——执行分析动作"""
        try:
            if not self.visual_analyzer:
                return ActionResult(
                    success=False,
                    message="视觉分析器未初始化",
                    error="分析器未初始化"
                )
            
            if not self.computer_adapter:
                return ActionResult(
                    success=False,
                    message="电脑控制适配器未初始化",
                    error="适配器未初始化"
                )
            
            # 先截图
            screenshot = await self.computer_adapter.take_screenshot()
            if not screenshot:
                return ActionResult(
                    success=False,
                    message="截图失败",
                    error="截图失败"
                )
            
            # 分析屏幕
            analysis = await self.visual_analyzer.analyze_screen(screenshot)
            
            if analysis.get("success"):
                return ActionResult(
                    success=True,
                    message="屏幕分析完成",
                    data=analysis
                )
            else:
                return ActionResult(
                    success=False,
                    message="屏幕分析失败",
                    error=analysis.get("error", "分析失败")
                )
                
        except Exception as e:
            return ActionResult(
                success=False,
                message=f"屏幕分析失败: {str(e)}",
                error=str(e)
            )
    
    def _is_safe_action(self, action_type: str, target: str) -> bool:
        """检查动作是否安全"""
        # 危险动作列表
        dangerous_actions = ["delete", "remove", "uninstall", "format", "shutdown", "restart"]
        dangerous_targets = ["系统", "system", "注册表", "registry", "管理员", "admin"]
        
        # 检查动作类型
        if any(danger in action_type.lower() for danger in dangerous_actions):
            return False
        
        # 检查目标
        if any(danger in target.lower() for danger in dangerous_targets):
            return False
        
        return True
    
    async def execute_step_sequence(self, steps: list) -> List[ActionResult]:
        """执行步骤序列"""
        results = []
        
        for i, step in enumerate(steps):  #遍历每个步骤
            logger.info(f"执行步骤 {i+1}/{len(steps)}: {step.get('action', 'unknown')}")
            
            # 检查依赖,检查这个步骤有没有依赖前面的步骤
            if not self._check_dependencies(step, results):
                results.append(ActionResult(
                    success=False,
                    message=f"步骤 {i+1} 依赖检查失败",
                    error="依赖检查失败"
                ))
                continue
            
            # 执行步骤
            result = await self.execute_action(step)
            results.append(result)
            
            # 如果步骤失败,可以选择继续或停止
            if not result.success:
                logger.warning(f"步骤 {i+1} 执行失败: {result.message}")
                # 这里可以选择继续执行或停止
                # break  # 停止执行
                continue  # 继续执行
        
        return results
    
    def _check_dependencies(self, step: dict, previous_results: List[ActionResult]) -> bool:
        """检查步骤依赖"""
        dependencies = step.get("dependencies", [])
        
        for dep in dependencies:
            # 查找依赖的步骤结果
            dep_result = None
            for result in previous_results:
                if hasattr(result, 'step_id') and result.step_id == dep:
                    dep_result = result
                    break
            
            if not dep_result or not dep_result.success:
                return False
        
        return True
    
    def is_available(self) -> Dict[str, Any]:
        """检查动作执行功能是否可用"""
        return {
            "enabled": True,
            "ready": self.computer_adapter is not None,
            "safety_mode": self.safety_mode,
            "max_retry_count": self.max_retry_count,
            "execution_history_count": len(self.execution_history)
        }

# ========这个->是什么东西?=========
#箭头 -> 在Python中叫做类型提示(Type Hints),也叫类型注解。
#def is_available(self) -> Dict[str, Any]:
#这个箭头 -> 的意思是:"这个函数会返回一个……",把它读成:"这个函数会返回一个 Dict[str, Any] 类型的东西"
#详细分解:
'''
1. 箭头左边:函数定义
python
def is_available(self)
这部分我们很熟悉,就是定义一个函数,函数名叫 is_available

2. 箭头 ->:表示"返回什么类型"
可以把它想象成一个方向指示箭头,指向函数返回的东西

3. 箭头右边:返回值的类型描述
Dict[str, Any]
Dict:字典类型(花括号 {} 那种)
[str, Any]:方括号里的内容是说明这个字典的键值对类型
  str:键(key)是字符串类型
  Any:值(value)可以是任何类型

类比理解:
想象一下,你要去超市买东西:
def 买东西(钱: int) -> 购物袋:
    # 用钱买东西
    return 装满东西的购物袋
钱: int:告诉你需要带整数(比如100元)
-> 购物袋:告诉你函数会返回一个购物袋
'''
#这个代码中的其他例子:
'''
例1:有参数的类型提示

async def execute_action(self, action: Dict[str, Any]) -> ActionResult:
输入:action: Dict[str, Any]
  参数名叫 action
  类型是字典,键是字符串,值可以是任何类型
输出:-> ActionResult
  会返回一个 ActionResult 类型的对象

例2:多个参数的类型提示
def _parse_coordinates(self, x, y) -> Tuple[int, int]:
输出:-> Tuple[int, int]
 会返回一个元组,包含两个整数

例3:可选的类型
data: Optional[Dict[str, Any]] = None
Optional[...] 表示"可能是这个类型,也可能是None"
所以这里的意思是:data 可能是字典,也可能是 None
'''
#为什么要有这个?
'''
好处1:像说明书一样

# 没有类型提示,你不知道该传什么
def 计算(参数):
    pass

# 有类型提示,很清楚该传什么
def 计算(参数: int) -> float:
    """
    参数应该传整数
    会返回小数
    """
    pass
    
好处2:IDE(编程工具)能帮你
编辑器会提示你该传什么类型的参数
如果你传错了类型,编辑器会警告你
代码补全会更准确

好处3:让别人更容易理解你的代码
就像你写的"使用说明书"
'''
#实际运行时会发生什么
#重要:这些类型提示只是提示!Python在运行时会忽略它们!
'''
看这个例子:
def 加法(a: int, b: int) -> int:
    return a + b

# 下面这些都能运行,不会报错!
print(加法(1, 2))        # ✅ 正确:3
print(加法("hello", " world"))  # ✅ 也能运行:"hello world"
print(加法([1], [2]))    # ✅ 也能运行:[1, 2]

类型提示只是给程序员和编程工具看的,Python解释器不检查它们。
'''
#这个文件中出现的所有类型提示:
'''
# 函数参数的类型提示
def 函数名(参数: 类型, 参数2: 类型)

# 函数返回值的类型提示  
def 函数名(...) -> 返回类型:

# 变量类型提示
变量名: 类型 = 初始值
'''
#常见类型:
'''
Dict[str, Any]      # 字典,键是字符串,值是任何类型
Optional[...]       # 可能是...,也可能是None
Tuple[int, int]     # 包含两个整数的元组
List[ActionResult]  # ActionResult类型的列表
bool                # 布尔值(True/False)
str                 # 字符串
int                 # 整数
float               # 小数
'''
#来,复习一下
'''
def 获取学生信息(学号: str) -> Optional[Dict[str, Any]]:
    """
    输入学号(字符串)
    返回学生信息(字典,可能是None)
    """
    
答案:
函数名:获取学生信息
需要传一个参数:学号,类型是字符串
返回值:可能是一个字典(键是字符串,值是任何类型),也可能是 None
'''
#总结:
#1.-> 是类型提示箭头,表示函数返回什么类型
#2.它只是提示,Python运行时不检查
#3.它让代码更清晰,更容易理解
#4.现代Python编程中推荐使用

computer_use_adapter.py

"""
电脑控制适配器 - 基于博弈论的ComputerUseAdapter实现
提供鼠标键盘控制、屏幕截图、视觉分析等核心功能
"""

import io           # 输入输出工具,处理数据流
import time         # 时间工具,用于等待和计时
import platform     # 系统平台工具,识别是Windows/Mac/Linux
import logging      # 日志工具,记录运行过程
from typing import Dict, Any, Optional, Tuple  # 类型提示
import sys          # 系统工具,管理Python运行环境
#特殊处理 - PIL库
if 'nagaagent_core.vendors.pil' in sys.modules:
    del sys.modules['nagaagent_core.vendors.pil']
from PIL import Image
'''
这是什么意思?
检查是否导入过某个特定的PIL库
如果导入过,就删除它(避免冲突)
然后重新导入标准的PIL库(Python的图像处理库)
这就像:先检查书包里有没有旧版本的课本,如果有就先拿出来,然后放新版本的进去。
'''
import asyncio # 异步编程工具,让程序能同时做多件事

# 尝试导入依赖包(可能没有的工具)
try:   #尝试导入pyautogui(控制鼠标键盘的核心工具)
    import nagaagent_core.vendors.pyautogui as pyautogui
    PYAUTOGUI_AVAILABLE = True
except ImportError:  # 就标记为不可用
    PYAUTOGUI_AVAILABLE = False
    pyautogui = None
#如果pyautogui安装了,就能控制鼠标键盘,如果没安装,程序不会崩溃,只是相关功能不能用

try:  #尝试导入GUI Agents(AI视觉分析工具)
    from gui_agents.s2_5.agents.grounding import OSWorldACI  #OSWorldACI:操作系统世界接口,可能用于理解屏幕内容
    from gui_agents.s2_5.agents.agent_s import AgentS2_5   #AgentS2_5:一个AI代理,可能用于识别屏幕上的元素
    GUI_AGENTS_AVAILABLE = True
except ImportError:    #重要变量(标记工具是否可用)
    GUI_AGENTS_AVAILABLE = False
    OSWorldACI = None
    AgentS2_5 = None
'''
这些标记告诉程序:
哪些功能可以使用
哪些功能因为缺少工具而不能用
'''

# 配置日志
logger = logging.getLogger(__name__)

class ComputerUseAdapter:
    """电脑控制适配器,基于博弈论的实现"""
    
    #类的初始化 __init__
    def __init__(self):
        """初始化电脑控制适配器"""
        self.last_error: Optional[str] = None  # 记录最后错误
        self.agent = None                      # AI代理
        self.grounding_agent = None            # 基础代理
        self.init_ok = False                   # 是否初始化成功
        
        # 设置DPI感知(Windows)
        self._setup_dpi_awareness()
        
        # 初始化屏幕尺寸 - 基于博弈论的实现
        self.screen_width = 1920  # 默认值
        self.screen_height = 1080  # 默认值
        self.scaled_width = 1920
        self.scaled_height = 1080
        self.scale_x = 1.0
        self.scale_y = 1.0
        #物理尺寸:电脑屏幕的实际分辨率,逻辑尺寸:统一缩放到1920x1080的标准尺寸
        #为什么?不同电脑的屏幕大小不同,统一标准让AI更容易识别,就像把所有照片都缩放到统一大小
        
        if PYAUTOGUI_AVAILABLE:  #用pyautogui获取实际屏幕大小
            try:
                # 动态获取实际屏幕尺寸
                self.screen_width, self.screen_height = pyautogui.size()
                logger.info(f"实际屏幕尺寸: {self.screen_width}x{self.screen_height}")
                
                # 计算缩放尺寸(参考博弈论的scale_screen_dimensions函数)
                self.scaled_width, self.scaled_height = self._scale_screen_dimensions( #调用_scale_screen_dimensions计算标准尺寸
                    self.screen_width, self.screen_height, max_dim_size=1920
                )
                
                # 计算缩放因子(逻辑坐标 -> 物理坐标),计算缩放比例(实际→标准)
                self.scale_x = self.screen_width / max(1, self.scaled_width)
                self.scale_y = self.screen_height / max(1, self.scaled_height)
                
            except Exception as e:
                logger.warning(f"获取屏幕尺寸失败: {e}")
                # 使用默认值
        
        # 初始化组件
        self._init_components()
    
    def _setup_dpi_awareness(self):    #有些电脑屏幕显示有缩放(比如150%显示)告诉Windows系统,这个程序需要精确的坐标,调用Windows的API函数
        """设置DPI感知,提高坐标精度"""
        try:
            if platform.system().lower() == "windows":   # 只在Windows系统上做
                import ctypes
                try:
                    ctypes.windll.shcore.SetProcessDpiAwareness(2)
                except Exception:
                    try:
                        ctypes.windll.user32.SetProcessDPIAware()
                    except Exception:
                        pass
        except Exception:
            pass
    
    #屏幕缩放函数(尺子转换公式)
    def _scale_screen_dimensions(self, width: int, height: int, max_dim_size: int = 1920) -> Tuple[int, int]:
        """缩放屏幕尺寸,基于博弈论的实现"""
        scale_factor = min(max_dim_size / width, max_dim_size / height)
        safe_width = int(width * scale_factor)
        safe_height = int(height * scale_factor)
        logger.info(f"屏幕缩放: {width}x{height} -> {safe_width}x{safe_height}, 缩放因子: {scale_factor:.2f}")
        return safe_width, safe_height
    
    def _init_components(self):
        """初始化核心组件(启动各个部件)"""
        try:
            if not PYAUTOGUI_AVAILABLE:       # 如果没有pyautogui,就报错返回
                self.last_error = "pyautogui未安装"
                logger.error("pyautogui未安装,电脑控制功能不可用")
                return
            
            if not GUI_AGENTS_AVAILABLE:      # 如果没有GUI代理,就报错返回
                self.last_error = "gui-agents未安装"
                logger.error("gui-agents未安装,高级功能不可用")
                return
            
            # 初始化GUI代理
            self._init_gui_agents()
            self.init_ok = True
            logger.info("电脑控制适配器初始化成功")
            
        except Exception as e:
            self.last_error = str(e)
            logger.error(f"电脑控制适配器初始化失败: {e}")
    
    def _init_gui_agents(self):
        """初始化GUI代理"""
        try:
            # 这里可以添加GUI代理的初始化逻辑
            # 暂时使用基础功能
            logger.info("GUI代理初始化完成")
        except Exception as e:
            logger.warning(f"GUI代理初始化失败: {e}")
    
    def is_available(self) -> Dict[str, Any]:
        """检查电脑控制功能是否可用"""
        ok = True
        reasons = []
        
        if not PYAUTOGUI_AVAILABLE:
            ok = False
            reasons.append("pyautogui未安装")
        
        if not self.init_ok:
            ok = False
            msg = "电脑控制适配器未初始化"
            if self.last_error:
                msg += f": {self.last_error}"
            reasons.append(msg)
        
        return {
            "enabled": True,
            "ready": ok,
            "reasons": reasons,
            "screen_info": {
                "physical_size": f"{self.screen_width}x{self.screen_height}",
                "scaled_size": f"{self.scaled_width}x{self.scaled_height}",
                "scale_factors": f"x={self.scale_x:.2f}, y={self.scale_y:.2f}"
            },
            "platform": platform.system()
        }
    
    async def take_screenshot(self) -> Optional[bytes]:
        """截取屏幕截图"""
        if not PYAUTOGUI_AVAILABLE:
            return None
        
        try:
            screenshot = pyautogui.screenshot()
            # 基于博弈论的实现:将截图缩放到逻辑尺寸
            screenshot = screenshot.resize((self.scaled_width, self.scaled_height), Image.LANCZOS)
            buf = io.BytesIO()
            screenshot.save(buf, format="PNG")
            return buf.getvalue()
        except Exception as e:
            logger.error(f"截取屏幕截图失败: {e}")
            return None
    
    def _scale_coordinates(self, x: int, y: int) -> Tuple[int, int]:
        """缩放坐标,将逻辑坐标转换为物理坐标"""
        scaled_x = int(round(x * self.scale_x))
        scaled_y = int(round(y * self.scale_y))
        return scaled_x, scaled_y
    
    def _create_scaled_pyautogui(self):
        """创建缩放版本的pyautogui代理"""
        if not PYAUTOGUI_AVAILABLE:
            return None
        
        class _ScaledPyAutoGUI:
            """
            轻量级代理,将逻辑坐标空间缩放到物理屏幕坐标
            支持多种坐标格式
            """
            def __init__(self, backend, scale_x: float, scale_y: float):
                self._backend = backend
                self._scale_x = scale_x
                self._scale_y = scale_y

            def __getattr__(self, name):
                # 回退到所有其他属性/方法
                return getattr(self._backend, name)

            def _scale_xy_from_args(self, args, kwargs):
                """从参数中缩放x,y坐标,支持多种坐标格式"""
                # 处理 (x, y) 格式
                if len(args) >= 2 and isinstance(args[0], (int, float)) and isinstance(args[1], (int, float)):
                    x = int(round(args[0] * self._scale_x))
                    y = int(round(args[1] * self._scale_y))
                    args = (x, y) + tuple(args[2:])
                # 处理 ((x, y),) 格式
                elif len(args) >= 1 and isinstance(args[0], (tuple, list)) and len(args[0]) == 2:
                    x_raw, y_raw = args[0]
                    if isinstance(x_raw, (int, float)) and isinstance(y_raw, (int, float)):
                        x = int(round(x_raw * self._scale_x))
                        y = int(round(y_raw * self._scale_y))
                        args = ((x, y),) + tuple(args[1:])
                else:
                    # 处理kwargs格式
                    if 'x' in kwargs and 'y' in kwargs and isinstance(kwargs['x'], (int, float)) and isinstance(kwargs['y'], (int, float)):
                        kwargs = dict(kwargs)
                        kwargs['x'] = int(round(kwargs['x'] * self._scale_x))
                        kwargs['y'] = int(round(kwargs['y'] * self._scale_y))
                return args, kwargs

            # 包装的绝对坐标鼠标API - 自动坐标缩放
            def moveTo(self, *args, **kwargs):
                args, kwargs = self._scale_xy_from_args(args, kwargs)
                return self._backend.moveTo(*args, **kwargs)

            def click(self, *args, **kwargs):
                args, kwargs = self._scale_xy_from_args(args, kwargs)
                return self._backend.click(*args, **kwargs)

            def doubleClick(self, *args, **kwargs):
                args, kwargs = self._scale_xy_from_args(args, kwargs)
                return self._backend.doubleClick(*args, **kwargs)

            def rightClick(self, *args, **kwargs):
                args, kwargs = self._scale_xy_from_args(args, kwargs)
                return self._backend.rightClick(*args, **kwargs)

            def dragTo(self, *args, **kwargs):
                args, kwargs = self._scale_xy_from_args(args, kwargs)
                return self._backend.dragTo(*args, **kwargs)
            
            def scroll(self, *args, **kwargs):
                """滚动操作也支持坐标缩放"""
                args, kwargs = self._scale_xy_from_args(args, kwargs)
                return self._backend.scroll(*args, **kwargs)
        
        return _ScaledPyAutoGUI(pyautogui, self.scale_x, self.scale_y)
    
    async def click(self, x: int, y: int, button: str = 'left') -> bool:
        """点击指定坐标"""
        if not PYAUTOGUI_AVAILABLE:
            return False
        
        try:
            # 缩放坐标
            scaled_x, scaled_y = self._scale_coordinates(x, y)
            
            if button == 'left':
                pyautogui.click(scaled_x, scaled_y)
            elif button == 'right':
                pyautogui.rightClick(scaled_x, scaled_y)
            elif button == 'middle':
                pyautogui.middleClick(scaled_x, scaled_y)
            else:
                pyautogui.click(scaled_x, scaled_y)
            
            logger.info(f"点击坐标: 逻辑({x}, {y}) -> 物理({scaled_x}, {scaled_y}), 按钮: {button}")
            return True
        except Exception as e:
            logger.error(f"点击操作失败: {e}")
            return False
    
    async def type_text(self, text: str, interval: float = 0.1) -> bool:
        """输入文本"""
        if not PYAUTOGUI_AVAILABLE:
            return False
        
        try:
            pyautogui.typewrite(text, interval=interval)
            logger.info(f"输入文本: {text}")
            return True
        except Exception as e:
            logger.error(f"输入文本失败: {e}")
            return False
    
    async def press_key(self, key: str) -> bool:
        """按下指定按键"""
        if not PYAUTOGUI_AVAILABLE:
            return False
        
        try:
            pyautogui.press(key)
            logger.info(f"按下按键: {key}")
            return True
        except Exception as e:
            logger.error(f"按键操作失败: {e}")
            return False
    
    async def scroll(self, x: int, y: int, clicks: int) -> bool:
        """滚动鼠标"""
        if not PYAUTOGUI_AVAILABLE:
            return False
        
        try:
            # 缩放坐标
            scaled_x, scaled_y = self._scale_coordinates(x, y)
            pyautogui.scroll(clicks, scaled_x, scaled_y)
            logger.info(f"滚动: 逻辑({x}, {y}) -> 物理({scaled_x}, {scaled_y}), 滚动量: {clicks}")
            return True
        except Exception as e:
            logger.error(f"滚动操作失败: {e}")
            return False
    
    async def drag_to(self, start_x: int, start_y: int, end_x: int, end_y: int, duration: float = 1.0) -> bool:
        """拖拽操作"""
        if not PYAUTOGUI_AVAILABLE:
            return False
        
        try:
            # 缩放坐标
            scaled_start_x, scaled_start_y = self._scale_coordinates(start_x, start_y)
            scaled_end_x, scaled_end_y = self._scale_coordinates(end_x, end_y)
            
            pyautogui.dragTo(scaled_end_x, scaled_end_y, duration=duration)
            logger.info(f"拖拽: 逻辑({start_x}, {start_y})->({end_x}, {end_y}) -> 物理({scaled_start_x}, {scaled_start_y})->({scaled_end_x}, {scaled_end_y})")
            return True
        except Exception as e:
            logger.error(f"拖拽操作失败: {e}")
            return False
    
    async def find_element_by_text(self, text: str, screenshot: Optional[bytes] = None) -> Optional[Tuple[int, int]]:
        """通过文本查找元素位置"""
        # 这里可以集成OCR功能
        # 暂时返回None,后续实现
        logger.info(f"查找文本元素: {text}")
        return None
    
    async def find_element_by_image(self, image_path: str, screenshot: Optional[bytes] = None) -> Optional[Tuple[int, int]]:
        """通过图像查找元素位置"""
        # 这里可以集成图像匹配功能
        # 暂时返回None,后续实现
        logger.info(f"查找图像元素: {image_path}")
        return None
    
    async def execute_instruction(self, instruction: str) -> Dict[str, Any]:
        """执行电脑控制指令"""
        try:
            logger.info(f"执行指令: {instruction}")
            
            # 解析指令
            action = self._parse_instruction(instruction)
            
            if not action:
                return {
                    "success": False,
                    "error": "无法解析指令",
                    "instruction": instruction
                }
            
            # 执行动作
            result = await self._execute_action(action)
            
            return {
                "success": result,
                "instruction": instruction,
                "action": action
            }
            
        except Exception as e:
            logger.error(f"执行指令失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "instruction": instruction
            }
    
    async def run_instruction(self, instruction: str, max_iterations: int = 15) -> Dict[str, Any]:
        """执行自然语言指令"""
        if not self.init_ok:
            return {"success": False, "error": "电脑控制适配器未初始化"}
        
        try:
            obs = {}
            traj = "任务:\n" + instruction
            
            for iteration in range(max_iterations):
                logger.info(f"执行迭代 {iteration + 1}/{max_iterations}")
                
                # 获取屏幕截图
                screenshot = await self.take_screenshot()
                if not screenshot:
                    return {"success": False, "error": "无法获取屏幕截图"}
                
                obs["screenshot"] = screenshot
                
                # 这里应该调用AI模型来生成下一步动作
                # 暂时使用简单的模拟逻辑
                if iteration == 0:
                    # 模拟AI生成的代码
                    code = f"# 执行任务: {instruction}\nprint('任务开始执行')"
                else:
                    code = "print('任务完成')"
                
                logger.info(f"执行代码: {code}")
                
                # 检查任务完成条件
                if "完成" in code or "done" in code.lower():
                    logger.info("任务完成")
                    break
                
                if "等待" in code or "wait" in code.lower():
                    await asyncio.sleep(3)
                    continue
                
                if "下一步" in code or "next" in code.lower():
                    continue
                
                # 执行代码(注入缩放后的pyautogui)
                try:
                    exec_env = globals().copy()
                    if PYAUTOGUI_AVAILABLE and hasattr(self, 'scale_x') and hasattr(self, 'scale_y'):
                        scaled_pyautogui = self._create_scaled_pyautogui()
                        if scaled_pyautogui:
                            exec_env['pyautogui'] = scaled_pyautogui
                    
                    exec(code, exec_env, exec_env)
                    await asyncio.sleep(0.5)
                    
                except Exception as e:
                    logger.error(f"代码执行失败: {e}")
                    continue
                
                # 更新任务轨迹
                traj += f"\n\n迭代 {iteration + 1}:\n{code}"
            
            return {
                "success": True, 
                "message": "任务执行完成",
                "iterations": iteration + 1,
                "trajectory": traj
            }
            
        except Exception as e:
            logger.error(f"任务执行失败: {e}")
            return {"success": False, "error": str(e)}
    
    def normalize_coordinates(self, x: float, y: float, screen_width: int = None, screen_height: int = None) -> Tuple[int, int]:
        """
        坐标标准化
        将任意坐标转换为0-1000范围的标准化坐标
        """
        if screen_width is None:
            screen_width = self.screen_width
        if screen_height is None:
            screen_height = self.screen_height
        
        # 标准化到0-1000范围
        normalized_x = int(round(x / screen_width * 1000))
        normalized_y = int(round(y / screen_height * 1000))
        
        # 确保在有效范围内
        normalized_x = max(0, min(1000, normalized_x))
        normalized_y = max(0, min(1000, normalized_y))
        
        return normalized_x, normalized_y
    
    def denormalize_coordinates(self, normalized_x: int, normalized_y: int, 
                               screen_width: int = None, screen_height: int = None) -> Tuple[int, int]:
        """
        反标准化坐标,将0-1000范围的坐标转换为实际像素坐标
        坐标反标准化
        """
        if screen_width is None:
            screen_width = self.screen_width
        if screen_height is None:
            screen_height = self.screen_height
        
        # 从标准化坐标转换为实际像素坐标
        pixel_x = int(round(normalized_x / 1000.0 * screen_width))
        pixel_y = int(round(normalized_y / 1000.0 * screen_height))
        
        return pixel_x, pixel_y
    
    async def click_with_ai_location(self, target_description: str, button: str = 'left') -> bool:
        """
        使用AI定位进行点击
        支持自然语言描述目标元素
        """
        try:
            # 获取屏幕截图
            screenshot = await self.take_screenshot()
            if not screenshot:
                logger.error("无法获取屏幕截图")
                return False
            
            # 使用AI定位元素
            from .visual_analyzer import VisualAnalyzer
            analyzer = VisualAnalyzer()
            location = await analyzer.locate_element_with_ai(
                target_description, 
                screenshot, 
                self.screen_width, 
                self.screen_height
            )
            
            if location:
                x, y = location
                return await self.click(x, y, button)
            else:
                logger.error(f"AI定位失败: {target_description}")
                return False
                
        except Exception as e:
            logger.error(f"AI定位点击失败: {e}")
            return False
    
    def get_coordinate_info(self) -> Dict[str, Any]:
        """获取坐标系统信息"""
        return {
            "screen_size": f"{self.screen_width}x{self.screen_height}",
            "scaled_size": f"{self.scaled_width}x{self.scaled_height}",
            "scale_factors": f"x={self.scale_x:.2f}, y={self.scale_y:.2f}",
            "normalization_range": "0-1000",
            "platform": platform.system()
        }
    
    def _parse_instruction(self, instruction: str) -> Optional[Dict[str, Any]]:
        """解析指令"""
        instruction = instruction.lower().strip()
        
        # 简单的指令解析
        if "点击" in instruction or "click" in instruction:
            return {"action": "click", "instruction": instruction}
        elif "输入" in instruction or "type" in instruction:
            return {"action": "type", "instruction": instruction}
        elif "截图" in instruction or "screenshot" in instruction:
            return {"action": "screenshot", "instruction": instruction}
        elif "滚动" in instruction or "scroll" in instruction:
            return {"action": "scroll", "instruction": instruction}
        else:
            return {"action": "unknown", "instruction": instruction}
    
    async def _execute_action(self, action: Dict[str, Any]) -> bool:
        """执行具体动作"""
        action_type = action.get("action")
        instruction = action.get("instruction")
        
        if action_type == "click":
            # 这里需要更智能的坐标定位
            # 暂时使用屏幕中心
            x, y = self.screen_width // 2, self.screen_height // 2
            return await self.click(x, y)
        elif action_type == "type":
            # 这里需要提取要输入的文本
            # 暂时使用示例文本
            return await self.type_text("Hello World")
        elif action_type == "screenshot":
            screenshot = await self.take_screenshot()
            return screenshot is not None
        elif action_type == "scroll":
            # 这里需要更智能的滚动参数
            return await self.scroll(self.screen_width // 2, self.screen_height // 2, 3)
        else:
            logger.warning(f"未知动作类型: {action_type}")
            return False

visual_analyzer.py

"""
视觉分析器 - 基于博弈论的视觉识别功能
提供OCR、图像匹配、元素定位、AI坐标定位等功能
AI坐标定位算法升级
"""

import logging  # 记录日志的,像写日记一样记录程序运行情况
from typing import Dict, Any, Optional, Tuple, List  # 这是Python的类型提示,告诉你函数返回什么类型
import sys  # 系统相关的功能

# 下面这两行是为了防止导入冲突
if 'nagaagent_core.vendors.pil' in sys.modules:
    del sys.modules['nagaagent_core.vendors.pil']

from PIL import Image  # 处理图片的主要工具
import io  # 处理二进制数据(比如图片数据)
import base64  # 把图片转换成文本格式
import re  # 正则表达式,处理文本模式匹配
import json  # 处理JSON数据格式

# 配置日志
logger = logging.getLogger(__name__)

class VisualAnalyzer:
    """视觉分析器"""
    
    def __init__(self):
        """初始化视觉分析器"""
        self.ocr_available = False   # OCR功能是否可用
        self.image_matching_available = False    # 图像匹配功能是否可用
        self.ai_coordinate_available = False     # AI定位功能是否可用
        
        # 尝试导入OCR库
        try:
            import nagaagent_core.vendors.pytesseract as pytesseract
            self.ocr_available = True
            logger.info("OCR功能已启用")
        except ImportError:
            logger.warning("pytesseract未安装,OCR功能不可用")
       
       # 尝试加载各个工具 
        # 尝试导入图像匹配库
        try:
            import nagaagent_core.vendors.cv2 as cv2
            import numpy as np
            self.cv2 = cv2
            self.np = np
            self.image_matching_available = True
            logger.info("图像匹配功能已启用")
        except ImportError:
            logger.warning("opencv-python未安装,图像匹配功能不可用")
        
        # 尝试导入AI坐标定位库
        try:
            from langchain_openai import ChatOpenAI
            self.ai_coordinate_available = True
            logger.info("AI坐标定位功能已启用")
        except ImportError:
            logger.warning("langchain-openai未安装,AI坐标定位功能不可用")
    
    async def analyze_screenshot(self, screenshot: bytes) -> Dict[str, Any]:  #分析屏幕截图(主函数)
        """分析屏幕截图"""
        try:
            # 加载图像
            image = Image.open(io.BytesIO(screenshot))
            
            analysis_result = {
                "image_size": image.size,
                "mode": image.mode,
                "ocr_text": [],
                "elements": [],
                "analysis_time": None
            }
            
            # OCR文本识别
            if self.ocr_available:
                ocr_result = await self._extract_text_from_image(image)
                analysis_result["ocr_text"] = ocr_result
            
            # 元素检测
            elements = await self._detect_elements(image)
            analysis_result["elements"] = elements
            
            logger.info(f"屏幕分析完成: 识别到 {len(analysis_result['ocr_text'])} 个文本, {len(elements)} 个元素")
            return analysis_result
            
        except Exception as e:
            logger.error(f"屏幕分析失败: {e}")
            return {
                "error": str(e),
                "image_size": None,
                "ocr_text": [],
                "elements": []
            }
    
    async def _extract_text_from_image(self, image: Image.Image) -> List[Dict[str, Any]]:   #从图片提取文字
        """从图像中提取文本"""
        if not self.ocr_available:
            return []
        
        try:
            import nagaagent_core.vendors.pytesseract as pytesseract
            
            # 使用OCR提取文本和位置信息
            data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
            
            text_elements = []
            for i in range(len(data['text'])):
                text = data['text'][i].strip()
                if text:  # 只处理非空文本
                    text_elements.append({
                        "text": text,
                        "confidence": data['conf'][i],
                        "bbox": {
                            "x": data['left'][i],
                            "y": data['top'][i],
                            "width": data['width'][i],
                            "height": data['height'][i]
                        }
                    })
            
            return text_elements
            
        except Exception as e:
            logger.error(f"OCR文本提取失败: {e}")
            return []

    async def _detect_elements(self, image: Image.Image) -> List[Dict[str, Any]]:   #检测图像中的元素
        """检测图像中的元素"""
        if not self.image_matching_available:
            return []
        
        try:
            # 转换为OpenCV格式
            cv_image = self.cv2.cvtColor(self.np.array(image), self.cv2.COLOR_RGB2BGR)
            
            # 检测边缘
            gray = self.cv2.cvtColor(cv_image, self.cv2.COLOR_BGR2GRAY)
            edges = self.cv2.Canny(gray, 50, 150)
            
            # 查找轮廓
            contours, _ = self.cv2.findContours(edges, self.cv2.RETR_EXTERNAL, self.cv2.CHAIN_APPROX_SIMPLE)
            
            elements = []
            for contour in contours:
                # 计算边界框
                x, y, w, h = self.cv2.boundingRect(contour)
                
                # 过滤太小的元素
                if w > 10 and h > 10:
                    elements.append({
                        "type": "contour",
                        "bbox": {"x": x, "y": y, "width": w, "height": h},
                        "area": w * h
                    })
            
            return elements
            
        except Exception as e:
            logger.error(f"元素检测失败: {e}")
            return []
    
    async def find_text_element(self, screenshot: bytes, target_text: str) -> Optional[Tuple[int, int]]:
        """查找包含指定文本的元素位置"""
        try:
            analysis = await self.analyze_screenshot(screenshot)
            
            for text_element in analysis.get("ocr_text", []):
                if target_text.lower() in text_element["text"].lower():
                    bbox = text_element["bbox"]
                    # 返回中心坐标
                    center_x = bbox["x"] + bbox["width"] // 2
                    center_y = bbox["y"] + bbox["height"] // 2
                    return (center_x, center_y)
            
            return None
            
        except Exception as e:
            logger.error(f"查找文本元素失败: {e}")
            return None
    
    async def find_image_element(self, screenshot: bytes, template_path: str) -> Optional[Tuple[int, int]]:
        """查找图像模板匹配的元素位置"""
        if not self.image_matching_available:
            return None
        
        try:
            # 加载模板图像
            template = self.cv2.imread(template_path)
            if template is None:
                logger.error(f"无法加载模板图像: {template_path}")
                return None
            
            # 加载屏幕截图
            screen_image = Image.open(io.BytesIO(screenshot))
            screen_cv = self.cv2.cvtColor(self.np.array(screen_image), self.cv2.COLOR_RGB2BGR)
            
            # 模板匹配
            result = self.cv2.matchTemplate(screen_cv, template, self.cv2.TM_CCOEFF_NORMED)
            min_val, max_val, min_loc, max_loc = self.cv2.minMaxLoc(result)
            
            # 如果匹配度足够高
            if max_val > 0.8:
                # 返回模板中心位置
                center_x = max_loc[0] + template.shape[1] // 2
                center_y = max_loc[1] + template.shape[0] // 2
                return (center_x, center_y)
            
            return None
            
        except Exception as e:
            logger.error(f"图像匹配失败: {e}")
            return None
    
    async def get_screen_info(self, screenshot: bytes) -> Dict[str, Any]:
        """获取屏幕信息"""
        try:
            image = Image.open(io.BytesIO(screenshot))
            
            return {
                "width": image.width,
                "height": image.height,
                "mode": image.mode,
                "format": image.format,
                "size_bytes": len(screenshot)
            }
            
        except Exception as e:
            logger.error(f"获取屏幕信息失败: {e}")
            return {
                "error": str(e),
                "width": 0,
                "height": 0
            }
    
    async def locate_element_with_ai(self, target_description: str, screenshot: bytes, 
                                   screen_width: int = 1920, screen_height: int = 1080) -> Optional[Tuple[int, int]]:
        """
        使用AI定位屏幕元素
        支持自然语言描述定位界面元素
        """
        if not self.ai_coordinate_available:
            logger.warning("AI坐标定位功能不可用")
            return None
        
        try:
            from langchain_openai import ChatOpenAI
            # 统一从系统配置读取视觉LLM参数
            from system.config import config
            cc = getattr(config, 'computer_control', None)
            model = getattr(cc, 'model', None) or config.api.model
            base_url = getattr(cc, 'model_url', None) or config.api.base_url
            api_key = getattr(cc, 'api_key', None) or config.api.api_key
            
            # 初始化LLM
            llm = ChatOpenAI(
                model=model,
                base_url=base_url,
                api_key=api_key,
                temperature=0
            )
            
            # 将截图转换为base64
            screenshot_b64 = base64.b64encode(screenshot).decode('utf-8')
            
            # 构建AI定位提示词
            prompt = f"""
            请分析屏幕截图并定位目标元素: "{target_description}"
            
            请按以下格式输出坐标:
            1. 如果可能,输出边界框 [x1, y1, x2, y2],坐标范围0-1000
            2. 如果边界框不合适,输出精确坐标 x,y
            3. 不要包含任何解释文字,只输出坐标
            
            屏幕尺寸: {screen_width}x{screen_height}
            """
            
            # 调用AI模型进行坐标定位
            response = llm.invoke([
                {
                    "role": "user", 
                    "content": [
                        {"type": "text", "text": prompt},
                        {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{screenshot_b64}"}}
                    ]
                }
            ])
            
            # 解析AI返回的坐标
            coordinates = self._parse_ai_coordinates(response.content, screen_width, screen_height)
            
            if coordinates:
                logger.info(f"AI定位成功: {target_description} -> {coordinates}")
                return coordinates
            else:
                logger.warning(f"AI定位失败: {target_description}")
                return None
                
        except Exception as e:
            logger.error(f"AI坐标定位失败: {e}")
            return None
    
    def _parse_ai_coordinates(self, response: str, screen_width: int, screen_height: int) -> Optional[Tuple[int, int]]:
        """
        解析AI返回的坐标
        支持边界框和精确坐标两种格式
        """
        try:
            # 清理响应文本
            response = response.strip()
            
            # 尝试解析边界框格式 [x1, y1, x2, y2]
            bbox_match = re.search(r'\[([0-9.,\s]+)\]', response)
            if bbox_match:
                coords_str = bbox_match.group(1)
                numbers = re.findall(r'-?\d+\.?\d*', coords_str)
                if len(numbers) >= 4:
                    x1, y1, x2, y2 = [float(n) for n in numbers[:4]]
                    # 检查是否在0-1000范围内(标准化坐标)
                    if max(x1, y1, x2, y2) <= 1000:
                        # 计算中心点并转换为实际像素坐标
                        center_x = (x1 + x2) / 2.0
                        center_y = (y1 + y2) / 2.0
                        pixel_x = int(round(center_x / 1000.0 * screen_width))
                        pixel_y = int(round(center_y / 1000.0 * screen_height))
                        return (pixel_x, pixel_y)
                    else:
                        # 直接使用像素坐标
                        center_x = (x1 + x2) / 2
                        center_y = (y1 + y2) / 2
                        return (int(center_x), int(center_y))
            
            # 尝试解析精确坐标格式 x,y
            coord_match = re.search(r'(\d+\.?\d*)\s*,\s*(\d+\.?\d*)', response)
            if coord_match:
                x, y = float(coord_match.group(1)), float(coord_match.group(2))
                # 检查是否在0-1000范围内(标准化坐标)
                if x <= 1000 and y <= 1000:
                    pixel_x = int(round(x / 1000.0 * screen_width))
                    pixel_y = int(round(y / 1000.0 * screen_height))
                    return (pixel_x, pixel_y)
                else:
                    # 直接使用像素坐标
                    return (int(x), int(y))
            
            # 尝试解析数字列表
            numbers = [int(float(x)) for x in re.findall(r'-?\d+\.?\d*', response)]
            if len(numbers) >= 2:
                x, y = numbers[0], numbers[1]
                if x <= 1000 and y <= 1000:
                    pixel_x = int(round(x / 1000.0 * screen_width))
                    pixel_y = int(round(y / 1000.0 * screen_height))
                    return (pixel_x, pixel_y)
                else:
                    return (x, y)
            
            return None
            
        except Exception as e:
            logger.error(f"坐标解析失败: {e}")
            return None
    
    async def locate_element(self, target: str, screenshot: bytes) -> Optional[Tuple[int, int]]:
        """
        智能元素定位,优先使用AI定位,回退到传统方法
        多层次定位策略
        """
        try:
            # 首先尝试AI定位
            if self.ai_coordinate_available:
                ai_result = await self.locate_element_with_ai(target, screenshot)
                if ai_result:
                    return ai_result
            
            # 回退到文本定位
            if self.ocr_available:
                text_result = await self.find_text_element(screenshot, target)
                if text_result:
                    return text_result
            
            # 回退到图像匹配(如果target是图像路径)
            if self.image_matching_available and target.endswith(('.png', '.jpg', '.jpeg')):
                image_result = await self.find_image_element(screenshot, target)
                if image_result:
                    return image_result
            
            logger.warning(f"无法定位元素: {target}")
            return None
            
        except Exception as e:
            logger.error(f"元素定位失败: {e}")
            return None
    
    def is_available(self) -> Dict[str, Any]:
        """检查视觉分析器可用性"""
        return {
            "ocr_available": self.ocr_available,
            "image_matching_available": self.image_matching_available,
            "ai_coordinate_available": self.ai_coordinate_available,
            "ready": self.ocr_available or self.image_matching_available or self.ai_coordinate_available
        }
    
# ==========代码里逻辑判断======
#这个代码里面的功能其实都是十分相似的,判断逻辑也十分相同,所以我们重点讲一下一些这个代码里面,处处可见的,重复出现的东西就行了。
'''
1. 错误处理逻辑(处处可见)
try:
    # 尝试做某件事
    do_something()
except Exception as e:  # 如果出错了
    logger.error(f"出错了: {e}")  # 记录错误
    return []  # 返回空结果,不让程序崩溃
    
为什么这样设计?
不能让一个功能出错导致整个程序崩溃
比如OCR功能坏了,图像匹配还能用

2. 功能可用性检查
def is_available(self) -> Dict[str, Any]:
    return {
        "ocr_available": self.ocr_available,
        "image_matching_available": self.image_matching_available,
        "ai_coordinate_available": self.ai_coordinate_available,
        "ready": self.ocr_available or self.image_matching_available or self.ai_coordinate_available
    }
这个函数告诉用户:我现在有哪些工具可用

3. AI坐标解析的智能判断
def _parse_ai_coordinates(self, response: str, screen_width: int, screen_height: int):
AI可能返回两种格式:
  [x1, y1, x2, y2] - 一个框的四个角
  x,y - 一个点的坐标
程序要能识别出是哪种格式并正确处理
'''
#这个文件的整体逻辑流程图
'''
开始使用视觉分析器
    ↓
初始化工具箱
    ├─ 检查OCR工具 ✓
    ├─ 检查图像匹配工具 ✓
    └─ 检查AI工具 ✓
    ↓
用户想找东西时:
    ↓
优先用AI定位(如果可用)
    ↓
如果AI找不到或不可用 → 用文字识别
    ↓
如果文字找不到 → 用图像匹配
    ↓
返回找到的位置 或 返回“找不到”
'''

文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇