本文最后更新于63 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com
先拎主次
要分析这个项目的文件结构,我们可以从“核心入口”到“辅助配置”的顺序拆解,先抓关键文件,再看辅助组件:

第一步:先分析最外层的核心文件(优先级最高)
- main.py
- 作用:项目的主入口文件,是代码执行的起点,几乎包含了项目的核心逻辑或启动流程。
- 分析价值:看它能快速知道项目是做什么的、怎么启动的。
- build.py + main.spec
- 作用: build.py 是打包脚本, main.spec 是打包配置文件(通常用于 PyInstaller ),负责把Python项目打包成可执行程序。
- 分析价值:了解项目的分发、部署方式。
- tart.bat / start.sh
- 作用:启动脚本( bat 是Windows用, sh 是Linux/macOS用),一键启动项目的快捷方式,里面会写启动 main.py 的命令。
- 分析价值:快速知道项目的启动步骤。
第二步:分析配置/依赖文件(优先级中等)
这些是项目运行的“环境支撑”:
- config.json.example
- 作用:配置文件模板,项目运行需要的参数(比如端口、数据库地址)都在这里定义,实际用的时候会改成 config.json 。
- 分析价值:了解项目的可配置项,以及运行需要的外部依赖(比如要连哪个服务)。
- requirements.txt
- 作用:Python依赖清单,记录项目需要安装的第三方库(比如 requests 、 flask 等)。
- 分析价值:通过依赖能推测项目用到的技术栈(比如用了 flask 说明是Web服务)。
- setup.py / setup.bat / setup.sh
- 作用:环境初始化脚本,负责安装依赖、配置环境等(比如 setup.sh 里可能会写 pip install -r requirements.txt )。
第三步:分析文件夹(优先级按“功能重要性”排序)
文件夹是按功能分类的模块,先看核心功能文件夹:
- agentserver / apiserver
- 作用:从名称看是服务端模块( agent 服务、 api 接口服务),是项目的核心功能载体(比如提供接口、处理业务逻辑)。
- 分析顺序:先看这两个文件夹里的代码,对应 main.py 的核心逻辑。
- game
- 作用:如果项目和“游戏”相关,这是游戏功能的模块;如果是其他场景,可能是业务逻辑的核心模块。
- mqtt_tool
- 作用: MQTT 是物联网/消息通信协议,这个文件夹是MQTT相关的工具/客户端,负责消息收发。
- system / ui / voice
- 作用: system 是系统工具模块; ui 是前端界面模块; voice 是语音功能模块,属于“扩展功能”。
- logs
- 作用:日志文件夹,项目运行时的日志会存在这里,调试时用,暂时可以后看。
第四步:辅助文件
- README.*.md :虽然这里的md,作者已经写得尽可能的详实了,但是我看起来还是有点不太明白,所以说我还是需要进行再探究的。
- .gitattributes / .gitignore :Git版本管理的配置文件,不影响项目功能,最后看;
- uv.lock :Python虚拟环境( uv 工具)的依赖锁文件,记录依赖的具体版本,辅助看环境用。
具体分析
main.py
# pyinstaller适配
import os
import sys
if os.path.exists("_internal"):
os.chdir("_internal")
#目的:让程序在打包后能正常运行
###做了什么:
# os.exists("_internal"):检查当前文件夹下有没有叫"_internal"的文件夹
#os.chdir("_internal"):如果有这个文件夹,就进入它(改变当前工作目录)
#为什么这么做:PyInstaller打包时会把程序需要的文件放到"_internal"文件夹里
#
# 检测是否在打包环境中
# PyInstaller打包后的程序会设置sys.frozen属性
IS_PACKAGED = getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS')
###目的:判断程序是被打包成exe,还是直接运行的.py文件
#函数说明:
'''
函数说明:
getattr(sys, 'frozen', False):获取sys.frozen属性,如果不存在就返回False
hasattr(sys, '_MEIPASS'):检查sys有没有'_MEIPASS'属性
逻辑判断:
如果两个条件都成立→IS_PACKAGED = True(程序被打包了)
任何一个不成立→IS_PACKAGED = False(直接运行Python脚本)
'''
# 标准库导入
import asyncio # 异步编程库(让程序可以同时做多件事)
import logging # 日志记录(记录程序运行信息)
import socket # 网络通信(连接网络)
import threading # 多线程(让程序同时运行多个任务)
import time # 时间相关功能(等待、计时等)
import warnings # 警告处理
import requests # HTTP请求(访问网页)
# 过滤弃用警告,提升启动体验
warnings.filterwarnings("ignore", category=DeprecationWarning, module="websockets")
warnings.filterwarnings("ignore", category=DeprecationWarning, module="uvicorn")
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*websockets.legacy.*")
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*WebSocketServerProtocol.*")
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*websockets.*")
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*uvicorn.*")
'''
目的:屏蔽一些烦人的警告信息,让程序启动时控制台更干净
过滤了哪些:
websockets库的弃用警告
uvicorn库的弃用警告
各种相关模块的弃用警告
'''
# 修复Windows socket兼容性问题
if not hasattr(socket, 'EAI_ADDRFAMILY'):
# Windows系统缺少这些错误码,添加兼容性常量
socket.EAI_ADDRFAMILY = -9
socket.EAI_AGAIN = -3
socket.EAI_BADFLAGS = -1
socket.EAI_FAIL = -4
socket.EAI_MEMORY = -10
socket.EAI_NODATA = -5
socket.EAI_NONAME = -2
socket.EAI_OVERFLOW = -12
socket.EAI_SERVICE = -8
socket.EAI_SOCKTYPE = -7
socket.EAI_SYSTEM = -11
'''
做了什么:
hasattr(socket, 'EAI_ADDRFAMILY'):检查socket模块有没有EAI_ADDRFAMILY属性
如果没有(Windows系统),就手动添加11个错误码常量
为什么:Linux/macOS有这些错误码,Windows没有,添加它们让程序在不同系统表现一致
'''
# 第三方库导入
# 优先使用仓库内的本地包,防止导入到系统已安装的旧版 nagaagent_core #
REPO_ROOT = os.path.dirname(os.path.abspath(__file__)) # 统一入口 #
LOCAL_PKG_DIR = os.path.join(REPO_ROOT, "nagaagent-core") # 统一入口 #
if LOCAL_PKG_DIR not in sys.path:
sys.path.insert(0, LOCAL_PKG_DIR) # 优先使用本地包 #
'''
设置本地包路径
目的:优先使用程序自己的库,而不是系统安装的库
做了什么:
os.path.abspath(__file__):获取当前文件的绝对路径
os.path.dirname(...):获取这个文件所在的文件夹路径
os.path.join(..., "nagaagent-core"):拼接出nagaagent-core文件夹的路径
sys.path.insert(0, ...):把这个路径插入到Python的搜索路径最前面(优先搜索)
'''
from nagaagent_core.vendors.PyQt5.QtGui import QIcon # 统一入口 #
from nagaagent_core.vendors.PyQt5.QtWidgets import QApplication # 统一入口 #
'''
导入PyQt5组件
导入的内容:
QIcon:图标类(用于显示程序图标)
QApplication:应用程序类(PyQt5程序的核心)
'''
# 本地模块导入
from system.system_checker import run_system_check, run_quick_check
from system.config import config, AI_NAME
# V14版本已移除早期拦截器,采用运行时猴子补丁
# conversation_core已删除,相关功能已迁移到apiserver
from summer_memory.memory_manager import memory_manager
from summer_memory.task_manager import start_task_manager, task_manager
from ui.pyqt_chat_window import ChatWindow
from ui.tray.console_tray import integrate_console_tray
'''
导入本地模块
'''
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("summer_memory")
logger.setLevel(logging.INFO)
'''
基本配置
做了什么:
设置全局日志级别为INFO(只记录重要信息)
创建名为"summer_memory"的日志记录器
设置这个记录器的级别也是INFO
'''
# 过滤HTTP相关日志
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
# 优化Live2D相关日志输出,减少启动时的信息噪音
logging.getLogger("live2d").setLevel(logging.WARNING) # Live2D库日志
logging.getLogger("live2d.renderer").setLevel(logging.WARNING) # 渲染器日志
logging.getLogger("live2d.animator").setLevel(logging.WARNING) # 动画器日志
logging.getLogger("live2d.widget").setLevel(logging.WARNING) # 组件日志
logging.getLogger("live2d.config").setLevel(logging.WARNING) # 配置日志
logging.getLogger("live2d.config_dialog").setLevel(logging.WARNING) # 配置对话框日志
logging.getLogger("OpenGL").setLevel(logging.WARNING) # OpenGL日志
logging.getLogger("OpenGL.acceleratesupport").setLevel(logging.WARNING) # OpenGL加速日志
'''
目的:减少不必要的日志输出,让控制台更干净
逻辑:
把这些模块的日志级别设为WARNING
只有警告和错误级别的信息才会被输出
INFO和DEBUG级别的信息会被忽略
'''
# 服务管理器类
class ServiceManager:
"""服务管理器 - 统一管理所有后台服务"""
def __init__(self): #__init__方法(初始化方法),这个方法在创建ServiceManager对象时自动调用:
self.loop = asyncio.new_event_loop()
'''
目的:创建一个新的事件循环(event loop)
事件循环是什么:想象成一个大总管,负责协调和管理所有的异步任务(让程序可以同时做多件事情)
'''
#目的:初始化4个变量,都设为None(空)
self.bg_thread = None #bg_thread:后台线程
self.api_thread = None #api_thread:API服务器线程
self.agent_thread = None # gent_thread:Agent服务器线程
self.tts_thread = None # gent_thread:Agent服务器线程
self._services_ready = False # 服务就绪状态
'''
设置一个标记,用来记录服务是否准备好了
下划线开头:Python约定,表示这是"内部使用的变量",其他代码不要直接修改它
'''
'''
段代码做了什么?
创建了一个叫ServiceManager的类
这个类的作用是:统一管理所有后台服务
'''
def start_background_services(self):
"""启动后台服务 - 异步非阻塞"""
# 启动后台任务管理器
self.bg_thread = threading.Thread(target=self._run_event_loop, daemon=True)
#创建线程:创建一个新的线程
'''
参数说明:
target=self._run_event_loop:这个线程要执行_run_event_loop方法
daemon=True:设为守护线程(当主程序退出时,这个线程也会自动退出)
'''
self.bg_thread.start() #启动线程:开始运行这个线程
logger.info(f"后台服务线程已启动: {self.bg_thread.name}") # 记录日志:记录后台服务线程已经启动了,并显示线程的名字
# 这个方法的作用:启动后台服务,但不阻塞主程序(主程序可以继续做其他事情)
# 移除阻塞等待,改为异步检查
# time.sleep(1) # 删除阻塞等待
def _run_event_loop(self):
"""运行事件循环(就是刚才说的那个(event loop))"""
# 启动后台服务,但不阻塞主程序(主程序可以继续做其他事情)
asyncio.set_event_loop(self.loop) #创建一个新的线程,设置事件循环:告诉asyncio库,使用我们刚才创建的事件循环
'''参数说明:
target=self._run_event_loop:这个线程要执行_run_event_loop方法
daemon=True:设为守护线程(当主程序退出时,这个线程也会自动退出)
'''
self.loop.run_until_complete(self._init_background_services())
#运行事件循环:启动事件循环,并且等_init_background_services方法完成
#run_until_complete:一直运行,直到括号里的任务完成
logger.info("后台服务事件循环已启动") # 记录日志:记录事件循环已经启动了
async def _init_background_services(self):
"""初始化后台服务 - 优化启动流程"""
logger.info("正在启动后台服务...") # 记录日志:告诉用户正在启动后台服务
try: # 异常处理:
# 任务管理器由memory_manager自动启动,无需手动启动
# await start_task_manager()
# 标记服务就绪
self._services_ready = True # 设置标记:把服务就绪标记设为True(真),表示服务已经准备好了
logger.info(f"任务管理器状态: running={task_manager.is_running}")
# 记录日志:显示任务管理器的运行状态
# f字符串:{task_manager.is_running}会替换为task_manager.is_running的实际值
# 保持事件循环活跃
while True: # 无限循环:保持事件循环一直运行
await asyncio.sleep(3600) # 每小时检查一次
#防止事件循环退出,让它一直保持在后台运行
except Exception as e: # except Exception as e:如果发生任何异常,就捕获它,并把异常信息保存到变量e中
logger.error(f"后台服务异常: {e}") # 记录错误日志
'''这个方法的特点:
前面有async关键字,表示这是一个异步方法
可以在里面使用await等待其他异步任务
'''
def check_port_available(self, host, port):
"""检查端口是否可用"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
'''
创建socket:
socket.AF_INET:使用IPv4地址
socket.SOCK_STREAM:使用TCP协议
with ... as s:自动管理socket的创建和关闭
'''
s.bind((host, port))
'''尝试绑定端口:把socket绑定到指定的主机和端口
如果成功:说明端口可用,没有被占用
如果失败:说明端口已经被其他程序占用了
'''
return True
except OSError: #端口被占用:捕获OSError异常,返回False
return False
def start_all_servers(self): #这是最重要的方法之一
"""并行启动所有服务:API(可选)、MCP、Agent、TTS - 同时启动,而不是一个接一个)优化版本"""
print("🚀 正在并行启动所有服务...")
print("=" * 50)
threads = [] # 用来存储所有要启动的线程
service_status = {} # 服务状态跟踪
try: #异常处理开始:try表示开始尝试执行
self._init_proxy_settings()
# 预检查所有端口,减少重复检查
from system.config import get_server_port #从system.config导入get_server_port函数
#创建字典:检查4个服务的端口是否可用
port_checks = {
'api': config.api_server.enabled and config.api_server.auto_start and
self.check_port_available(config.api_server.host, config.api_server.port),
#对于API服务器:需要同时满足3个条件(启用、自动启动、端口可用)
'mcp': self.check_port_available("0.0.0.0", get_server_port("mcp_server")),
'agent': self.check_port_available("0.0.0.0", get_server_port("agent_server")),
'tts': self.check_port_available("0.0.0.0", config.tts.port)
#对于其他服务器:只检查端口是否可用
}
# API服务器(可选)
if port_checks['api']:
api_thread = threading.Thread(target=self._start_api_server, daemon=True)
threads.append(("API", api_thread))
service_status['API'] = "准备启动"
elif config.api_server.enabled and config.api_server.auto_start:
print(f"⚠️ API服务器: 端口 {config.api_server.port} 已被占用,跳过启动")
service_status['API'] = "端口占用"
'''逻辑流程:
如果API端口可用 → 创建线程,准备启动
否则如果API启用了且要自动启动 → 打印警告,标记端口被占用
'''
# MCP服务器
if port_checks['mcp']:
mcp_thread = threading.Thread(target=self._start_mcp_server, daemon=True)
threads.append(("MCP", mcp_thread))
service_status['MCP'] = "准备启动"
else:
print(f"⚠️ MCP服务器: 端口 {get_server_port('mcp_server')} 已被占用,跳过启动")
service_status['MCP'] = "端口占用"
# Agent服务器
if port_checks['agent']:
agent_thread = threading.Thread(target=self._start_agent_server, daemon=True)
threads.append(("Agent", agent_thread))
service_status['Agent'] = "准备启动"
else:
print(f"⚠️ Agent服务器: 端口 {get_server_port('agent_server')} 已被占用,跳过启动")
service_status['Agent'] = "端口占用"
# TTS服务器
if port_checks['tts']:
tts_thread = threading.Thread(target=self._start_tts_server, daemon=True)
threads.append(("TTS", tts_thread))
service_status['TTS'] = "准备启动"
else:
print(f"⚠️ TTS服务器: 端口 {config.tts.port} 已被占用,跳过启动")
service_status['TTS'] = "端口占用"
# 显示服务启动计划
print("\n📋 服务启动计划:")
for service, status in service_status.items():
if status == "准备启动":
print(f" 🔄 {service}服务器: 正在启动...")
else:
print(f" ⚠️ {service}服务器: {status}")
print("\n🚀 开始启动服务...")
print("-" * 30)
# 批量启动所有线程
for name, thread in threads:
thread.start()
print(f"✅ {name}服务器: 启动线程已创建")
print("-" * 30)
print(f"🎉 服务启动完成: {len(threads)} 个服务正在后台运行")
print("=" * 50)
'''
显示计划:先告诉用户哪些服务要启动
批量启动:循环启动所有线程
显示结果:告诉用户启动了多少个服务
'''
except Exception as e:
print(f"❌ 并行启动服务异常: {e}")
def _init_proxy_settings(self):
"""初始化代理设置:若不启用代理,则清空系统代理环境变量"""
# 检测 applied_proxy 状态
if not config.api.applied_proxy: # 当 applied_proxy 为 False 时
print("检测到不启用代理,正在清空系统代理环境变量...")
# 清空 HTTP/HTTPS 代理环境变量(跨平台兼容)
proxy_vars = ["HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy"]
for var in proxy_vars:
if var in os.environ: #os.environ:操作系统的环境变量
#if var in os.environ:检查变量是否存在
del os.environ[var] # 删除环境变量
# del os.environ[var]:删除这个环境变量
print(f"已清除代理环境变量: {var}")
# 额外:确保 requests Session 没有全局代理配置
global_session = requests.Session()
#requests.Session():创建一个新的会话
if global_session.proxies:
global_session.proxies.clear() # 清除会话中的所有代理设置
print("已清空 requests Session 全局代理配置")
# 各个服务器的启动方法
# 这些方法的结构都很相似,我们看一个例子:
'''
都使用try...except进行错误处理
都使用uvicorn.run()启动服务器
参数都类似:
host:监听的主机地址
port:监听的端口
log_level="error":只记录错误日志
access_log=False:不记录访问日志
reload=False:不自动重载
'''
def _start_api_server(self):
"""内部API服务器启动方法"""
try:
from nagaagent_core.api import uvicorn
uvicorn.run(
"apiserver.api_server:app",
host=config.api_server.host,
port=config.api_server.port,
log_level="error",
access_log=False,
reload=False,
ws_ping_interval=None, # 禁用WebSocket ping
ws_ping_timeout=None # 禁用WebSocket ping超时
)
except ImportError as e:
print(f" ❌ API服务器依赖缺失: {e}")
except Exception as e:
print(f" ❌ API服务器启动失败: {e}")
def _start_mcp_server(self):
"""内部MCP服务器启动方法"""
try:
import uvicorn
from mcpserver.mcp_server import app
from system.config import get_server_port
uvicorn.run(
app,
host="0.0.0.0",
port=get_server_port("mcp_server"),
log_level="error",
access_log=False,
reload=False,
ws_ping_interval=None, # 禁用WebSocket ping
ws_ping_timeout=None # 禁用WebSocket ping超时
)
except Exception as e:
print(f" ❌ MCP服务器启动失败: {e}")
def _start_agent_server(self):
"""内部Agent服务器启动方法"""
try:
import uvicorn
from agentserver.agent_server import app
from system.config import get_server_port
uvicorn.run(
app,
host="0.0.0.0",
port=get_server_port("agent_server"),
log_level="error",
access_log=False,
reload=False,
ws_ping_interval=None, # 禁用WebSocket ping
ws_ping_timeout=None # 禁用WebSocket ping超时
)
except Exception as e:
print(f" ❌ Agent服务器启动失败: {e}")
def _start_tts_server(self):
"""内部TTS服务器启动方法"""
try:
from voice.output.start_voice_service import start_http_server
start_http_server()
except Exception as e:
print(f" ❌ TTS服务器启动失败: {e}")
# NagaPortal自动登录相关方法
def _start_naga_portal_auto_login(self):
"""启动NagaPortal自动登录(异步)"""
try:
# 检查是否配置了NagaPortal,如果用户名或密码没有配置 → 直接返回(不执行后面的代码)
if not config.naga_portal.username or not config.naga_portal.password:
return # 静默跳过,不输出日志
# 在新线程中异步执行登录
def run_auto_login():
try:
import sys
import os
# 添加项目根目录到Python路径
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
from mcpserver.agent_naga_portal.portal_login_manager import auto_login_naga_portal
# 创建新的事件循环
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# result = loop.run_until_complete(auto_login_naga_portal())
'''事件循环的创建和使用:
创建新的事件循环:每个线程需要自己的事件循环
设置事件循环:告诉asyncio使用这个新的事件循环
运行异步函数:等待自动登录完成
'''
try:
# 执行自动登录
result = loop.run_until_complete(auto_login_naga_portal())
if result['success']:
# 登录成功,显示状态
print("✅ NagaPortal自动登录成功")
self._show_naga_portal_status()
else:
# 登录失败,显示错误
error_msg = result.get('message', '未知错误')
print(f"❌ NagaPortal自动登录失败: {error_msg}")
self._show_naga_portal_status()
finally:
loop.close()
except Exception as e:
# 登录异常,显示错误
print(f"❌ NagaPortal自动登录异常: {e}")
self._show_naga_portal_status()
'''嵌套函数:
run_auto_login:在方法内部定义的函数,只在当前方法中使用
为什么这样设计:让登录逻辑在一个独立的线程中运行,不阻塞主程序
'''
# 启动后台线程
import threading
login_thread = threading.Thread(target=run_auto_login, daemon=True)
login_thread.start()
except Exception as e:
# 启动异常,显示错误
print(f"❌ NagaPortal自动登录启动失败: {e}")
self._show_naga_portal_status()
def _show_naga_portal_status(self):
"""显示NagaPortal状态(登录完成后调用)"""
try:
from mcpserver.agent_naga_portal.portal_login_manager import get_portal_login_manager
login_manager = get_portal_login_manager()
status = login_manager.get_status()
cookies = login_manager.get_cookies()
#从登录管理器获取状态和cookie信息
print(f"🌐 NagaPortal状态:")
print(f" 地址: {config.naga_portal.portal_url}")
print(f" 用户: {config.naga_portal.username[:3]}***{config.naga_portal.username[-3:] if len(config.naga_portal.username) > 6 else '***'}")
#安全显示用户名:username[:3]:显示前3个字符,username[-3:]:显示最后3个字符,***:中间用星号代替,目的:保护用户隐私,不完全显示用户名
if cookies:
print(f"🍪 Cookie信息 ({len(cookies)}个):")
for name, value in cookies.items():
print(f" {name}: {value}")
else:
print(f"🍪 Cookie: 未获取到")
user_id = status.get('user_id')
if user_id:
print(f"👤 用户ID: {user_id}")
else:
print(f"👤 用户ID: 未获取到")
# 显示登录状态
if status.get('is_logged_in'):
print(f"✅ 登录状态: 已登录")
else:
print(f"❌ 登录状态: 未登录")
if status.get('login_error'):
print(f" 错误: {status.get('login_error')}")
except Exception as e:
print(f"🍪 NagaPortal状态获取失败: {e}")
def _start_mqtt_status_check(self):
"""启动物联网通讯连接并显示状态(异步)"""
try:
# 检查是否配置了物联网通讯
if not config.mqtt.enabled:
return # 静默跳过,不输出日志
# 在新线程中异步执行物联网通讯连接
def run_mqtt_connection():
try:
import sys
import os
import time
# 添加项目根目录到Python路径
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
try:
from mqtt_tool.device_switch import device_manager
# 尝试连接物联网设备
if hasattr(device_manager, 'connect'):
success = device_manager.connect()
if success:
print("🔗 物联网通讯状态: 已连接")
else:
print("⚠️ 物联网通讯状态: 连接失败(将在使用时重试)")
else:
print("❌ 物联网通讯功能不可用")
except Exception as e:
print(f"⚠️ 物联网通讯连接失败: {e}")
except Exception as e:
print(f"❌ 物联网通讯连接异常: {e}")
# 启动后台线程
import threading
mqtt_thread = threading.Thread(target=run_mqtt_connection, daemon=True)
mqtt_thread.start()
except Exception as e:
print(f"❌ 物联网通讯连接启动失败: {e}")
def _init_voice_system(self):
"""初始化语音处理系统"""
try:
if config.system.voice_enabled:
logger.info("语音功能已启用(语音输入+输出),由UI层管理")
else:
logger.info("语音功能已禁用")
except Exception as e:
logger.warning(f"语音系统初始化失败: {e}")
def _init_memory_system(self):
"""初始化记忆系统"""
try:
if memory_manager and memory_manager.enabled:
logger.info("夏园记忆系统已初始化")
else:
logger.info("夏园记忆系统已禁用")
except Exception as e:
logger.warning(f"记忆系统初始化失败: {e}")
def _init_mcp_services(self):
"""初始化MCP服务系统"""
try:
# MCP服务现在由mcpserver独立管理,这里只需要记录日志
logger.info("MCP服务系统由mcpserver独立管理")
except Exception as e:
logger.error(f"MCP服务系统初始化失败: {e}")
def show_naga_portal_status(self):
"""显示NagaPortal配置状态(手动调用)"""
try:
if config.naga_portal.username and config.naga_portal.password:
print(f"🌐 NagaPortal: 已配置账户信息")
print(f" 地址: {config.naga_portal.portal_url}")
print(f" 用户: {config.naga_portal.username[:3]}***{config.naga_portal.username[-3:] if len(config.naga_portal.username) > 6 else '***'}")
# 获取并显示Cookie信息
try:
from mcpserver.agent_naga_portal.portal_login_manager import get_portal_login_manager
login_manager = get_portal_login_manager()
status = login_manager.get_status()
cookies = login_manager.get_cookies()
if cookies:
print(f"🍪 Cookie信息 ({len(cookies)}个):")
for name, value in cookies.items():
# 显示完整的cookie名称和值
print(f" {name}: {value}")
else:
print(f"🍪 Cookie: 未获取到")
user_id = status.get('user_id')
if user_id:
print(f"👤 用户ID: {user_id}")
else:
print(f"👤 用户ID: 未获取到")
# 显示登录状态
if status.get('is_logged_in'):
print(f"✅ 登录状态: 已登录")
else:
print(f"❌ 登录状态: 未登录")
if status.get('login_error'):
print(f" 错误: {status.get('login_error')}")
except Exception as e:
print(f"🍪 状态获取失败: {e}")
else:
print(f"🌐 NagaPortal: 未配置账户信息")
print(f" 如需使用NagaPortal功能,请在config.json中配置naga_portal.username和password")
except Exception as e:
print(f"🌐 NagaPortal: 配置检查失败 - {e}")
# 工具函数
#这部分定义了三个简单但有用的小工具:
'''都很简单,只有一行代码
都是给用户提供便利的工具,不涉及复杂的逻辑
'''
def show_help():
print('系统命令: 清屏, 查看索引, 帮助, 退出')
def show_index():
print('主题分片索引已集成,无需单独索引查看')
def clear(): # 清除屏幕上的内容
os.system('cls' if os.name == 'nt' else 'clear')
'''工作原理:
os.name == 'nt':检查当前操作系统是不是Windows('nt'表示Windows)
'cls' if ... else 'clear':
如果是Windows → 使用cls命令
如果是Linux/macOS → 使用clear命令
os.system():执行系统命令
'''
# 延迟初始化 - 避免启动时阻塞,这是一个非常重要的函数
def _lazy_init_services():
#下划线开头:表示这是内部函数,其他代码不应该直接调用
#lazy:表示"懒惰的",意思是"等到需要的时候再做"
"""延迟初始化服务 - 在需要时才初始化"""
global service_manager, n
'''global关键字:
global service_manager:告诉Python,我们要修改全局变量service_manager
global n:告诉Python,我们要修改全局变量n
为什么需要global:在函数内部修改全局变量时需要使用
'''
if not hasattr(_lazy_init_services, '_initialized'):
'''检查是否已初始化
hasattr(_lazy_init_services, '_initialized'):检查函数本身有没有_initialized属性
这是一种技巧:利用函数本身的属性来记录状态
逻辑:如果没有初始化过(_initialized属性不存在),就执行初始化
'''
# 初始化服务管理器
service_manager = ServiceManager() #创建ServiceManager对象
service_manager.start_background_services()
# service_manager.start_background_services():调用对象的start_background_services方法
# conversation_core已删除,相关功能已迁移到apiserver
n = None #n = None:把全局变量n设为None(空)
# 初始化各个系统(conversation_core已删除,直接初始化服务)
#调用了ServiceManager对象的三个初始化方法:
service_manager._init_mcp_services() #初始化MCP服务
service_manager._init_voice_system() #初始化语音系统
service_manager._init_memory_system() # 初始化记忆系统
# service_manager._load_persistent_context() # 删除重复加载,UI渲染时会自动加载
# 初始化进度文件
#with open('./ui/styles/progress.txt', 'w') as f:
#f.write('0')
#何意味?注释了 by Null
#原本的作用:创建一个进度文件,写入初始值0
# 显示系统状态,这段代码打印了一些重要的系统状态信息:
print("=" * 30) #分隔线:打印30个等号,用作视觉分隔
print(f"GRAG状态: {'启用' if memory_manager.enabled else '禁用'}")
'''
条件表达式
'启用' if memory_manager.enabled else '禁用'
如果memory_manager.enabled为True → 显示"启用"
否则 → 显示"禁用"
否则 → 显示"禁用"
'''
if memory_manager.enabled: #条件判断:只有记忆系统启用时才执行
stats = memory_manager.get_memory_stats() #获取统计信息:memory_manager.get_memory_stats()
from summer_memory.quintuple_graph import graph, GRAG_ENABLED
print(f"Neo4j连接: {'成功' if graph and GRAG_ENABLED else '失败'}")
#从summer_memory.quintuple_graph导入graph和GRAG_ENABLED
#检查Neo4j图数据库是否连接成功
print("=" * 30)
print(f'{AI_NAME}系统已启动') #显示AI名称:使用之前导入的AI_NAME变量
print("=" * 30)
# 启动服务(并行异步)
service_manager.start_all_servers()
'''启动所有服务
调用service_manager.start_all_servers()方法
注释说明:并行异步启动
'''
# 启动NagaPortal自动登录
service_manager._start_naga_portal_auto_login() #调用_start_naga_portal_auto_login()方法
print("⏳ NagaPortal正在后台自动登录...") # 打印提示信息:⏳ NagaPortal正在后台自动登录...
# 启动物联网通讯连接
service_manager._start_mqtt_status_check() # 调用_start_mqtt_status_check()方法
print("⏳ 物联网通讯正在后台初始化连接...")
show_help() #调用之前定义的show_help()函数
_lazy_init_services._initialized = True
'''设置初始化标记
给函数本身添加_initialized属性,并设为True
作用:下次调用这个函数时,hasattr(_lazy_init_services, '_initialized')会返回True
结果:不会重复初始化,只初始化一次
'''
# NagaAgent适配器 - 优化重复初始化
# 类定义,作为适配器,封装了对naga对象的访问,确保在使用前服务已经初始化
class NagaAgentAdapter:
def __init__(s): #初始化方法,参数:s(实际上是self,只是用了不同的名字)
# 使用全局实例,避免重复初始化
_lazy_init_services() #调用_lazy_init_services(),确保服务已初始化
s.naga = n # 使用全局实例,把全局变量n赋值给s.naga
#respond_stream方法,这是一个异步生成器方法
#async def:定义异步方法
async def respond_stream(s, txt): #s:对象自身(相当于self),txt:输入的文本
#异步迭代:async for用于迭代异步生成器
async for resp in s.naga.process(txt): #s.naga.process(txt):调用naga对象的process方法处理文本,这个方法返回一个异步生成器,每次产生一个响应
yield AI_NAME, resp, None, True, False #yield:产生一个值,但不结束函数(生成器特性)
'''返回的元组:包含5个值:
AI_NAME:AI的名字,resp:处理后的响应,None:空值
True:布尔值真.False:布尔值假
'''
# 主程序入口
if __name__ == "__main__":
'''
__name__ 是一个特殊的Python变量
当一个Python文件被直接运行时,__name__ 的值是 "__main__"
当一个Python文件被导入为模块时,__name__ 的值是文件名(不带.py)
'''
import argparse # 导入argparse模块
'''
Python标准库中的模块
用来解析命令行参数
可以让程序接受用户从命令行输入的参数
'''
# 解析命令行参数
parser = argparse.ArgumentParser(description="NagaAgent - 智能对话助手") # 创建参数解析器
#ArgumentParser:参数解析器的类,description:程序描述,显示在帮助信息中
#创建的parser对象:可以理解为一个"参数接收器",负责定义程序能接受哪些参数
parser.add_argument("--check-env", action="store_true", help="运行系统环境检测") # 添加命令行参数,--check-env:运行完整的系统环境检测
#参数说明:"--check-env":参数的名称(前面有两个减号),action="store_true":如果用户输入了这个参数,就把值设为True,help="...":帮助信息,说明这个参数的作用
parser.add_argument("--quick-check", action="store_true", help="运行快速环境检测") # --quick-check:运行快速环境检测
parser.add_argument("--force-check", action="store_true", help="强制运行环境检测(忽略缓存)") # --force-check:强制运行检测,忽略缓存
args = parser.parse_args() #解析命令行参数
'''这一行代码的作用:读取用户在命令行输入的参数
根据前面定义的规则解析这些参数
把结果保存到args对象中
args对象:包含了所有解析后的参数值,例如,如果用户输入了--check-env,那么args.check_env就是True
'''
'''
用户输入了--check-env或--quick-check吗?
↓
是 → 需要运行环境检测
↓
用户输入了--quick-check吗?
↓
是 → 运行快速检测(run_quick_check)
否 → 运行完整检测(run_system_check)
↓
检测成功吗?
↓
是 → 退出程序,返回0(表示成功)
否 → 退出程序,返回1(表示失败)
'''
# 处理检测命令
if args.check_env or args.quick_check: #条件判断:如果用户输入了--check-env或--quick-check,or逻辑:两个条件满足一个就行
if args.quick_check: #如果用户输入了--quick-check → 运行快速检测
success = run_quick_check()
else: # 否则 → 运行完整检测
success = run_system_check(force_check=args.force_check) # force_check=args.force_check:把--force-check参数传给检测函数
sys.exit(0 if success else 1)
'''条件表达式:0 if success else 1
如果success是True → 返回0
如果success是False → 返回1
sys.exit():退出程序
退出码的意义:0:成功,非0(如1):失败
'''
# 系统环境检测
print("🚀 正在启动NagaAgent...")
print("=" * 50)
# 如果是打包环境,跳过所有环境检测
if IS_PACKAGED:
print("📦 检测到打包环境,跳过系统环境检测...") #如果是打包环境 → 跳过检测
else:
# 执行系统检测(只在第一次启动时检测)
#非打包环境的检测:
if not run_system_check(): #运行系统检测:run_system_check(),逻辑反转:not表示"如果不是",意思:如果检测失败(返回False)
print("\n❌ 系统环境检测失败,程序无法启动")
print("请根据上述建议修复问题后重新启动")
i=input("是否无视检测结果继续启动?是则按y,否则按其他任意键退出...")
if i != "y" and i != "Y":
sys.exit(1)
print("\n🎉 系统环境检测通过,正在启动应用...")
print("=" * 50)
#设置异步事件循环
#asyncio.get_event_loop():获取当前的事件循环,is_running():检查事件循环是否在运行
if not asyncio.get_event_loop().is_running(): # 如果事件循环不在运行
asyncio.set_event_loop(asyncio.new_event_loop())
#asyncio.new_event_loop():创建一个新的事件循环
#asyncio.set_event_loop(...):把这个新的事件循环设为当前循环
# 快速启动UI,后台服务延迟初始化
#QApplication对象:PyQt5程序的"心脏",管理整个GUI应用程序,sys.argv:传递命令行参数给Qt
app = QApplication(sys.argv)
icon_path = os.path.join(os.path.dirname(__file__), "ui", "img/window_icon.png") #最终路径:当前目录/ui/img/window_icon.png
#os.path.dirname(__file__):获取当前文件所在的目录
#os.path.join(...):拼接路径
app.setWindowIcon(QIcon(icon_path))
#QIcon(icon_path):从路径创建图标对象
#app.setWindowIcon(...):设置应用程序的图标
# 集成控制台托盘功能
console_tray = integrate_console_tray()
#调用integrate_console_tray()函数,创建系统托盘图标,返回托盘对象,保存到console_tray变量
# 创建并显示主窗口,立即显示UI,提升用户体验
win = ChatWindow() #创建窗口对象:ChatWindow(),调用之前导入的ChatWindow类
win.setWindowTitle("NagaAgent") # 设置窗口标题:setWindowTitle("NagaAgent")
win.show() # 显示窗口:show()方法让窗口可见
# 在UI显示后异步初始化后台服务,异步初始化后台服务
def init_services_async(): #在函数内部调用_lazy_init_services()(我们之前分析的那个函数)
"""异步初始化后台服务"""\
#用try...except捕获异常,防止初始化失败导致程序崩溃
try:
_lazy_init_services()
except Exception as e:
print(f"⚠️ 后台服务初始化异常: {e}")
# 使用定时器延迟初始化,避免阻塞UI,导入QTimer:Qt的定时器类
from nagaagent_core.vendors.PyQt5.QtCore import QTimer
QTimer.singleShot(100, init_services_async) # 100ms后初始化
#singleShot:单次定时器,init_services_async:要执行的函数
#这是最重要的一行代码:它启动了整个GUI程序
sys.exit(app.exec_())
#启动事件循环:开始处理用户的所有操作(点击、键盘输入等)
#这个方法会一直运行,直到程序退出
#整个启动流程总结
'''
开始
↓
检查是否直接运行(__name__ == "__main__")
↓
解析命令行参数
↓
用户输入了检测参数吗?
├─ 是 → 运行检测 → 退出程序
↓
否(正常启动)
↓
显示启动信息
↓
是打包环境吗?
├─ 是 → 跳过环境检测
↓
否 → 运行系统环境检测
↓
检测通过吗?
├─ 否 → 询问是否继续 → 用户选择继续吗?
│ ├─ 是 → 继续启动
│ ↓
│ 否 → 退出程序
↓
是 → 设置异步事件循环
↓
创建Qt应用程序对象
↓
设置应用程序图标
↓
创建系统托盘图标
↓
创建并显示主窗口(用户立即看到界面)
↓
设置定时器,100毫秒后初始化后台服务
↓
启动Qt事件循环(程序开始运行,等待用户操作)
↓
用户操作程序...
↓
用户关闭程序 → 退出事件循环 → 退出Python程序
'''
build.py
import os # 文件和文件夹操作(就像Windows的资源管理器)
import shutil # 复制、移动文件
import subprocess # 运行外部命令(就像在cmd里输入命令)
import sys # 系统相关功能,比如退出程序
import platform # 判断是什么操作系统(Windows、Mac、Linux)
import glob # 查找文件名(可以用*这种通配符)
import zipfile # 压缩和解压zip文件
import tarfile # 压缩和解压tar文件
def user_confirmation():
"""提示清空配置,让用户确认是否继续"""
print("=========================================")
print(" 提示:请确保config.json内无隐私数据或已删除。")
print("=========================================")
response = input("请确认是否继续 (输入 'Y' 继续,或按 Enter/n 取消): ").strip().lower()
if response != 'y':
print("\n操作已取消。")
sys.exit(0)
#什么是uv? uv是一个快速的Python包管理器,就像pip但更快。
def check_uv():
"""检查 uv 是否存在,不存在就提示安装,然后退出"""
print("\n[1/7] 检查 uv 是否已安装...")
try:
# 运行 uv --version 来检查是否存在
# capture_output=True 和 check=True 确保在失败时能捕获输出或抛出异常
subprocess.run(['uv', '--version'],
capture_output=True,
text=True,
check=True)
print(" -> uv 检查通过。")
except FileNotFoundError:
print("\n[错误] 找不到 'uv' 命令。")
print("请先安装 uv (推荐使用 pipx install uv) 或确保它在系统 PATH 中。")
sys.exit(1)
except subprocess.CalledProcessError as e:
print(f"\n[错误] 执行 'uv --version' 失败,返回非零状态码: {e.returncode}")
print("请检查 uv 安装是否正确。")
sys.exit(1)
def manage_config_files():
config_path = os.path.join('config.json') # 当前文件夹的config.json
example_path = os.path.join('config.json.example') # 示例配置文件
print("\n[2/7] 检查配置文件...")
if os.path.exists(config_path): # 如果已经有config.json
print(f" -> 已检测到 {config_path},跳过创建。")
else: # 如果没有config.json
if os.path.exists(example_path): # 如果有示例文件
try:
shutil.copy(example_path, config_path) # 复制示例文件
print(f" -> 未检测到 {config_path},已从 {example_path} 复制。")
except Exception as e:
print(f"[错误] 复制配置文件失败: {e}")
sys.exit(1)
else:
print(f"[错误] 未找到 {example_path},无法创建 {config_path}。")
sys.exit(1)
#简单理解:就像做蛋糕,如果没有配方(config.json),就从示例配方(config.json.example)抄一份。
#主要构建过程,这是最复杂的部分,分4个小步骤
def run_build_commands():
"""执行 uv sync 和 pyinstaller 命令"""
print("\n[3/7] 安装所需依赖...")
try:
subprocess.run(['uv', 'sync', '--group', 'build'], check=True) #安装依赖
print(" -> uv sync 完成。")
except subprocess.CalledProcessError:
print("[错误] 'uv sync' 失败。请检查 'pyproject.toml' 和网络连接。")
sys.exit(1)
# 给Linux/Mac文件加权限(只有非Windows需要)
if platform.system() != "Windows": # 如果不是Windows系统
print("\n[4/7] 授予执行权限...") # 找到所有.so文件
try:
# 使用 glob 递归查找 .venv 下的 .so* 文件,并为每个文件添加可执行权限
so_files = glob.glob('.venv/**/*.so*', recursive=True)
if not so_files:
print(" -> 未发现 .so* 文件,跳过授予执行权限。")
else:
for f in so_files:
try:
st = os.stat(f).st_mode
os.chmod(f, st | 0o111) # 加上可执行权限
except Exception as e:
print(f" -> 无法修改权限 {f}: {e}")
print(" -> 执行权限授予完成。")
except Exception:
print("[错误] 执行权限授予失败。")
sys.exit(1)
else:
print("\n[4/7] Windows无需授予执行权限,已跳过...")
print("\n[5/7] 运行构建命令")
try:
# pyinstaller 命令,用PyInstaller打包
subprocess.run(['uv', 'run', 'pyinstaller', 'main.spec', '--clean', '--noconfirm'], check=True)
print(" -> PyInstaller 构建完成。")
except subprocess.CalledProcessError:
print("[错误] 'pyinstaller' 构建失败。请检查 PyInstaller 配置。")
sys.exit(1)
def create_instructions_and_venv_placeholder():
"""创建说明文件和 .venv 占位符(说明文件)"""
## 路径定义
dist_main_dir = os.path.join('dist', 'main') # dist/main文件夹
instructions_path = os.path.join(dist_main_dir, '使用说明.txt') # 说明文件路径
#venv_path = os.path.join(dist_main_dir, '.venv')
#placeholder_path = os.path.join(venv_path, '说明.txt')
# 使用说明内容
instructions_content = """\
config.json(配置文件)位置:_internal/config.json,也可以使用GUI进行配置填写
错误请群里反应或前往https://github.com/69gg/NagaAgent/issues,不要前往官方仓库
双击运行可执行文件即可启动程序
环境检测错误直接输入y跳过即可,不影响使用
"""
# .venv/说明.txt 内容
placeholder_content = "占位"
print(f"\n[6/7] 创建 {instructions_path}...")
# 确保目标目录存在,创建文件夹(如果不存在)
os.makedirs(dist_main_dir, exist_ok=True)
# 写入使用说明
try:
with open(instructions_path, 'w', encoding='utf-8') as f:
f.write(instructions_content)
print(f" -> {os.path.basename(instructions_path)} 写入完成。")
except Exception as e:
print(f"[警告] 写入使用说明失败: {e}")
## 写入 .venv 占位文件
#try:
# with open(placeholder_path, 'w', encoding='utf-8') as f:
# f.write(placeholder_content)
# print(" -> .venv/说明.txt 写入完成。")
#except Exception as e:
# print(f"[警告] 写入 .venv 占位文件失败: {e}")
#重命名和打包,这个函数是整个构建流程的最后一步
def rename_and_zip_output():
"""重命名 dist/main 目录并快速归档(不压缩)"""
# 确定系统名称
sys_map = {
'Windows': 'Windows',
'Linux': 'Linux',
'Darwin': 'Darwin' # Darwin is the core of macOS
}
current_system = sys_map.get(platform.system(), 'Unknown')
#platform.system() 返回:'Windows'、'Linux' 或 'Darwin'(macOS)
# 确定系统架构
arch = platform.machine() # 直接使用原始机器架构名称
#platform.machine() 返回:'x86_64'(64位)、'AMD64'、'arm64'(苹果M芯片)等
source_dir = os.path.join('dist', 'main') # 源文件夹:dist/main
target_base_name = f'NagaAgent_{current_system}_{arch}' # 新名称
target_dir_path = os.path.join('dist', target_base_name) # 新路径
zip_file_path = f'{target_dir_path}.zip' # 压缩包路径
#这样就能生成像 NagaAgent_Windows_x86_64 这样的文件名
if not os.path.isdir(source_dir): # 如果dist/main不存在
print(f"\n[错误] 找不到构建目录 {source_dir}。请检查 PyInstaller 是否成功运行。")
sys.exit(1)
'''找到打包好的程序文件夹(dist/main)
准备给它起个新名字,包含系统信息
检查这个文件夹是否存在(不存在说明前面步骤失败了)'''
#重命名文件夹
print(f"\n[7/7] 打包为压缩包...")
try:
# 如果目标目录已存在,先删除,确保重命名成功
if os.path.exists(target_dir_path):
shutil.rmtree(target_dir_path) # 删除旧的
print(f" -> 已删除旧目录 {target_dir_path}。")
# 重命名
os.rename(source_dir, target_dir_path)
print(f" -> 重命名 {os.path.basename(source_dir)} 为 {os.path.basename(target_dir_path)} 完成。")
except Exception as e:
print(f"[错误] 重命名失败: {e}")
sys.exit(1)
#Windows系统的打包方式(最复杂)
print(f"\n -> 开始打包 {target_dir_path} ...")
try:
# Windows: 尝试使用 7z(支持多线程),否则回退到使用多线程读取并写入 zip(压缩在写入时仍为单线程)
if platform.system() == "Windows":
zip_file_path = f'{target_dir_path}.zip'
sevenz = shutil.which('7z') or shutil.which('7za') or shutil.which('7zr')
if sevenz: # 如果有7z工具
print(" -> 检测到 7z,使用 7z 多线程压缩为 zip ...")
try:
subprocess.run([sevenz, 'a', '-tzip', zip_file_path, target_dir_path, '-mx=9', '-mmt'], check=True)
print(f" -> 多线程压缩完成。文件位置: {zip_file_path}")
except subprocess.CalledProcessError as e:
print(f"[错误] 使用 7z 打包失败: {e}")
sys.exit(1)
else: # 如果没有7z,用Python自己的方法
print(" -> 未检测到 7z,使用 Python 多线程读取并写入 zip(兼容模式)...")
import concurrent.futures # 多线程模块
# 找到所有要压缩的文件
files = []
for root, _, fnames in os.walk(target_dir_path):
for fname in fnames:
full = os.path.join(root, fname)
arcname = os.path.relpath(full, os.path.join(target_dir_path, '..'))
# 压缩包内路径
files.append((full, arcname))
try: # 使用多线程读取文件
max_workers = os.cpu_count() or 4 # 用CPU核心数做线程数
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as ex:
# 并行读取文件到内存
future_map = {ex.submit(open, p, 'rb'): (p, a) for p, a in files}
# 为了保证顺序无关紧要,我们按完成顺序写入 zip
with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
read_futures = []
for p, a in files:
read_futures.append(ex.submit(lambda p=p: open(p, 'rb').read()))
# 按原 files 顺序写入以使 archive 稳定
for (p, a), fut in zip(files, read_futures):
data = fut.result() # 获取读取的文件内容
zipf.writestr(a, data) # 写入zip
print(f" -> 归档完成。文件位置: {zip_file_path}")
except Exception as e:
print(f"[错误] 归档失败: {e}")
sys.exit(1)
'''7z方式:如果有7z就用它,因为它快且支持多线程
Python方式:用多线程同时读取多个文件,但写入zip时还是单线程
为什么用多线程读取? 因为读取文件时CPU在等硬盘,可以同时读多个 '''
# Linux / Darwin: 优先使用 xz(更高压缩率),回退使用 pigz + tar,最后回退到 tarfile
else:
tar_xz_path = f'{target_dir_path}.tar.xz' # .tar.xz 文件路径
tar_gz_path = f'{target_dir_path}.tar.gz' # .tar.gz 文件路径
tar_cmd = shutil.which('tar') # 找tar命令
xz_cmd = shutil.which('xz') # 找xz命令
pigz_cmd = shutil.which('pigz') # 找pigz命令(多线程gzip)
# 首先尝试使用 xz(最高压缩率)
if tar_cmd and xz_cmd:
print(" -> 检测到 tar 和 xz,使用 xz 最高压缩率压缩为 tar.xz ...")
try:
nproc = str(os.cpu_count() or 4)
cwd = os.path.dirname(target_dir_path) or '.'
base = os.path.basename(target_dir_path)
with open(tar_xz_path, 'wb') as out_f:
p1 = subprocess.Popen([tar_cmd, '-C', cwd, '-cf', '-', base], stdout=subprocess.PIPE)
# xz -9: 最高压缩级别, -T: 多线程
p2 = subprocess.Popen([xz_cmd, '-9', '-T', nproc], stdin=p1.stdout, stdout=out_f)
p1.stdout.close()
ret2 = p2.wait()
ret1 = p1.wait()
if ret1 != 0 or ret2 != 0:
raise subprocess.CalledProcessError(ret2 or ret1, 'tar|xz')
print(f" -> 多线程 xz 压缩完成。文件位置: {tar_xz_path}")
except Exception as e:
print(f"[错误] xz/tar 打包失败: {e}")
sys.exit(1)
# 回退使用 pigz
elif tar_cmd and pigz_cmd:
print(" -> 检测到 tar 和 pigz,使用 pigz 多线程压缩为 tar.gz ...")
try:
nproc = str(os.cpu_count() or 4)
cwd = os.path.dirname(target_dir_path) or '.'
base = os.path.basename(target_dir_path)
with open(tar_gz_path, 'wb') as out_f:
p1 = subprocess.Popen([tar_cmd, '-C', cwd, '-cf', '-', base], stdout=subprocess.PIPE)
p2 = subprocess.Popen([pigz_cmd, '-9', '-p', nproc], stdin=p1.stdout, stdout=out_f)
p1.stdout.close()
ret2 = p2.wait()
ret1 = p1.wait()
if ret1 != 0 or ret2 != 0:
raise subprocess.CalledProcessError(ret2 or ret1, 'tar|pigz')
print(f" -> 多线程归档完成。文件位置: {tar_gz_path}")
except Exception as e:
print(f"[错误] pigz/tar 打包失败: {e}")
sys.exit(1)
else:
# 最后回退到 Python tarfile,优先尝试 xz 格式
try:
print(" -> 未检测到外部压缩工具,使用 Python tarfile xz 格式压缩...")
with tarfile.open(tar_xz_path, 'w:xz', preset=9) as tar:
tar.add(target_dir_path, arcname=os.path.basename(target_dir_path))
print(f" -> tar.xz 归档完成。文件位置: {tar_xz_path}")
except Exception:
# tarfile 可能不支持 xz,回退到 gzip
try:
print(" -> Python tarfile 不支持 xz,回退到 gzip 压缩...")
with tarfile.open(tar_gz_path, 'w:gz', compresslevel=9) as tar:
tar.add(target_dir_path, arcname=os.path.basename(target_dir_path))
print(f" -> tar.gz 归档完成。文件位置: {tar_gz_path}")
except Exception as e:
print(f"[错误] tarfile 打包失败: {e}")
sys.exit(1)
except Exception as e:
print(f"[错误] 打包过程出现异常: {e}")
sys.exit(1)
def main():
user_confirmation() # 1. 确认提示
check_uv() # 2. 检查uv工具
manage_config_files() # 3. 处理配置文件
run_build_commands() # 4. 构建命令(安装依赖、打包)
create_instructions_and_venv_placeholder() # 5. 创建说明文件
rename_and_zip_output() # 6. 重命名和压缩
print("\n=========================================")
print(" 构建和打包流程已全部完成!")
print("=========================================")
if __name__ == "__main__":
main()
'''
开始
↓
用户确认 (user_confirmation) ← 如果用户不输入'y'就退出
↓
检查uv工具 (check_uv) ← 如果没有uv就退出
↓
处理配置文件 (manage_config_files) ← 复制示例配置文件
↓
构建命令 (run_build_commands)
├─ 安装依赖 (uv sync)
├─ 给Linux/Mac文件加权限 (如果不是Windows)
└─ 打包成exe (pyinstaller)
↓
创建说明文件 (create_instructions_and_venv_placeholder)
↓
重命名和压缩 (rename_and_zip_output)
├─ Windows: 压缩为.zip (优先用7z)
├─ Linux: 压缩为.tar.xz (优先用tar+xz)
└─ macOS: 压缩为.tar.xz (优先用tar+xz)
↓
显示完成信息
↓
结束
'''
setup.py
import os # 操作系统的工具(比如打开文件)
import platform #查看系统信息的工具(比如知道是Windows还是Mac)
import subprocess # 运行其他程序或命令的工具
import shutil # 文件操作工具(复制、删除文件)
import sys # Python系统工具(退出程序等)
from pathlib import Path #处理文件路径的工具
import re # 正则表达式工具(用来查找特定格式的文字)
'''这个函数的工作就像找钥匙:
目标:在电脑上找到可以使用的Python程序
寻找顺序:先找python3,再找python,再找py,最后找python3.11
检查方法:用python --version命令看每个程序是否能运行
结果:如果找到就返回程序名和版本号,找不到就返回空'''
def find_python_command(preferred_versions=None):
"""
在 PATH 中查找可用的 python 可执行文件,并返回第一个匹配的命令及其版本输出。
preferred_versions: 按优先级排列的命令列表(例如 ["python3.11", "python3", "python"])
返回 (命令, 版本输出字符串) 或 (None, None)
"""
if preferred_versions is None:
# <<< 变化开始: 调整了检测顺序,优先检测更通用的命令
preferred_versions = ["python3", "python", "py", "python3.11"]
# <<< 变化结束
for cmd in preferred_versions:
try:
# 通过 `--version` 获取版本信息(部分 Python 将输出到 stderr)
proc = subprocess.run([cmd, "--version"], capture_output=True, text=True, check=True)
out = (proc.stdout or proc.stderr).strip()
if out:
return cmd, out
except (FileNotFoundError, subprocess.CalledProcessError):
# 命令不存在或执行失败,继续尝试下一个
continue
return None, None
def parse_version(version_output: str):
"""
从版本输出字符串中解析出版本号(匹配 X.Y 或 X.Y.Z)
返回匹配的版本字符串或 None
"""
m = re.search(r"(\d+\.\d+\.\d+|\d+\.\d+)", version_output)
return m.group(1) if m else None
def is_python_compatible() -> tuple[bool, str, str]:
"""
检查 Python 版本。优先寻找 3.11,但也返回找到的其他版本信息。
返回 (是否兼容3.11, 使用的 python 命令, 版本输出字符串)
"""
# <<< 变化开始: 函数现在返回三个值,包括找到的任何python版本信息
cmd, out = find_python_command()
if not cmd:
return False, "", ""
ver = parse_version(out) or ""
try:
parts = [int(x) for x in ver.split(".")]
except Exception:
return False, cmd, out
# 仅允许 3.11.x
if len(parts) >= 2 and parts[0] == 3 and parts[1] == 11:
return True, cmd, out
return False, cmd, out
# <<< 变化结束
def is_uv_available() -> bool:
"""
检查是否安装并在 PATH 中可用的 `uv` 工具
"""
return shutil.which("uv") is not None
if __name__ == "__main__":
print("开始进行初始化")
use_uv = is_uv_available()
python_cmd = ""
if use_uv:
print(" ✅ 检测到 uv,将用以同步依赖,跳过python版本检测")
else:
print(" ℹ️ 未检测到 uv,正在检查 Python 环境...")
is_compatible, found_cmd, version_output = is_python_compatible()
if is_compatible:
python_cmd = found_cmd
print(f" ✅ 检测到兼容的 Python 3.11: {python_cmd} ({version_output})")
print(" ℹ️ 将使用 venv 和 pip 进行安装")
elif found_cmd:
print(f" ⚠️ 未检测到 Python 3.11,但找到了: {found_cmd} ({version_output})")
prompt = " 👉 是否要尝试使用此 Python 自动安装 'uv' 以继续? (y/n): "
try:
answer = input(prompt).lower().strip()
except (EOFError, KeyboardInterrupt):
# 用户按 Ctrl+C 或 Ctrl+D 中断
print("\n ❌ 操作已取消。")
sys.exit(1)
if answer in ['y', 'yes']:
try:
print(f" ⏳ 正在使用 {found_cmd} 安装 uv...")
subprocess.run([found_cmd, "-m", "pip", "install", "uv"], check=True)
print(" ✅ uv 安装成功!")
use_uv = True # 标记切换到 uv 路径
except subprocess.CalledProcessError as e:
print(f" ❌ uv 安装失败: {e}")
print(" ❌ 请手动安装 Python 3.11 或访问 https://docs.astral.sh/uv/getting-started/installation/ 手动安装 uv")
sys.exit(1)
else:
print(" ❌ 操作已取消。请安装 Python 3.11 或 uv 后重试。")
sys.exit(1)
else:
print(" ❌ 未在 PATH 中找到任何可用的 Python 环境。")
print(" ❌ 请安装 Python 3.11 或访问 https://docs.astral.sh/uv/getting-started/installation/ 安装 uv。")
sys.exit(1)
repo_root = Path(__file__).parent.resolve()
if use_uv:
# 使用 uv 来同步依赖并安装 playwright 的 chromium
print(" ⚙️ 正在使用 uv 同步依赖...")
try:
subprocess.run(["uv", "sync"], check=True, cwd=repo_root)
print(" ⚙️ 正在使用 uv 安装 Playwright browsers...")
# uv run 会在uv管理的环境中执行命令
subprocess.run(["uv", "run", "playwright", "install", "chromium"], check=True, cwd=repo_root)
except subprocess.CalledProcessError as e:
print(f" ❌ uv 操作失败: {e}")
sys.exit(1)
else:
# 使用传统的 venv/pip 流程
venv_dir = repo_root / ".venv"
print(f" ⚙️ 正在创建虚拟环境到: {venv_dir}")
try:
subprocess.run([python_cmd, "-m", "venv", str(venv_dir)], check=True)
except subprocess.CalledProcessError as e:
print(f" ❌ 创建虚拟环境失败: {e}")
sys.exit(1)
# 虚拟环境中 Python 可执行文件的路径(Windows 和类 Unix 不同)
if platform.system() == "Windows":
venv_python = venv_dir / "Scripts" / "python.exe"
else:
venv_python = venv_dir / "bin" / "python"
if not venv_python.exists():
print(" ❌ 虚拟环境 Python 未找到,安装中断")
sys.exit(1)
print(" ⚙️ 正在安装依赖...")
try:
subprocess.run([str(venv_python), "-m", "pip", "install", "--upgrade", "pip"], check=True)
req_file = repo_root / "requirements.txt"
if req_file.exists():
subprocess.run([str(venv_python), "-m", "pip", "install", "-r", str(req_file)], check=True)
else:
print(" ⚠️ requirements.txt 未找到,跳过 pip 安装")
print(" ⚙️ 正在安装 Playwright browsers...")
subprocess.run([str(venv_python), "-m", "playwright", "install", "chromium"], check=True)
except subprocess.CalledProcessError as e:
print(f" ❌ 依赖安装失败: {e}")
sys.exit(1)
# 处理配置文件 config.json:如果不存在则从 config.json.example 复制一份
cfg = repo_root / "config.json"
example = repo_root / "config.json.example"
if not cfg.exists():
if example.exists():
try:
shutil.copyfile(str(example), str(cfg))
print(" ✅ 已创建 config.json")
except Exception as e:
print(f" ❌ 复制 config.json.example 失败: {e}")
sys.exit(1)
else:
print(" ❌ config.json.example 不存在,无法创建 config.json")
sys.exit(1)
else:
print(" ✅ config.json 已存在")
# 使用系统默认编辑器打开 config.json,便于用户编辑
print(" 📥 使用系统默认编辑器打开 config.json,请根据需要进行修改")
try:
if platform.system() == "Windows":
os.startfile(str(cfg))
elif platform.system() == "Darwin":
subprocess.run(["open", str(cfg)], check=True)
else:
subprocess.run(["xdg-open", str(cfg)], check=True)
except Exception as e:
print(f" ⚠️ 无法自动打开 config.json: {e}")
print("\n🎉 初始化完成,可以启动程序了(例如运行 start.bat / start.sh)")
其他文件夹你打开了之后发现他们就是基本上就是只包含了一个文件配置的就是告诉你要配置哪些文件,哪些文件是需要进行安装的,所以呢,这些东西不是涉及整个核心功能的实现。


