本文最后更新于71 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com
| 脚本名称 | 主要功能 | 当前状态 | 备注 |
|---|---|---|---|
| check_environment.py | 检查Python环境和依赖包 | 完成 | 基础环境验证 |
| test_wp_api.py | 测试WordPress REST API连接 | 完成 | 验证API可用性 |
| extract_content.py | 使用corpress包提取内容 | 需要验证 | 核心提取功能 |
| validate_extraction.py | 验证提取结果的质量 | 需要验证 | 质量保证 |
| format_content.py | 清理和优化提取的内容 | 需要验证 | 后处理 |
| run_integration_test.py | 集成测试完整工作流 | 需要验证 | 端到端测试 |
脚本依赖关系
check_environment.py
↓ (环境就绪)
test_wp_api.py
↓ (API可用)
extract_content.py
↓ (内容提取成功)
validate_extraction.py
↓ (质量验证通过)
format_content.py
当前主要问题总结
- WordPress REST API 配置问题
- 网站可能没有正确启用REST API
- 访问
/wp-json/返回乱码而不是结构化的JSON - 这会影响所有基于API的提取脚本
- 脚本间兼容性问题
- 提取脚本的输出格式可能与验证/格式化脚本的预期输入不匹配
- 缺少统一的错误处理和数据传递机制
- 缺少渐进式测试方法
- 没有从简单到复杂的测试策略
- 难以定位具体哪个环节出现问题
重新设计的简化工作流
阶段1:基础验证
python
# 第一步:验证环境
python check_environment.py
# 第二步:手动验证API
# 在浏览器中访问: https://你的网站/wp-json/
# 确认返回的是JSON而不是乱码
阶段2:分步测试
python
# 创建简单的测试脚本 test_basic.py
import requests
import json
def test_basic():
url = "https://你的网站/wp-json/wp/v2/posts"
try:
response = requests.get(url)
print(f"状态码: {response.status_code}")
print(f"内容类型: {response.headers.get('content-type')}")
if response.status_code == 200:
data = response.json()
print(f"找到 {len(data)} 篇文章")
return True
else:
print(f"响应内容: {response.text[:200]}") # 显示前200字符
return False
except Exception as e:
print(f"错误: {e}")
return False
if __name__ == "__main__":
test_basic()
阶段3:简化提取流程
如果API工作正常,我们可以简化提取流程:
# simple_extract.py - 简化版提取脚本
import requests
import os
import json
from datetime import datetime
def simple_extract(site_url, output_dir="./simple_output"):
"""简化版内容提取"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 获取文章列表
posts_url = f"{site_url}/wp-json/wp/v2/posts"
try:
response = requests.get(posts_url)
if response.status_code != 200:
print(f"❌ 获取文章失败: {response.status_code}")
return False
posts = response.json()
print(f"✅ 找到 {len(posts)} 篇文章")
# 提取每篇文章
for i, post in enumerate(posts):
title = post.get('title', {}).get('rendered', f'文章_{i}')
content = post.get('content', {}).get('rendered', '')
# 简单的文件名
filename = f"post_{i+1}.txt"
filepath = os.path.join(output_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"标题: {title}\n\n")
f.write(f"内容: {content}\n")
print(f"📄 保存: {filename}")
# 保存元数据
meta = {
"extracted_at": datetime.now().isoformat(),
"total_posts": len(posts),
"site_url": site_url
}
with open(os.path.join(output_dir, "metadata.json"), 'w', encoding='utf-8') as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
print(f"🎉 提取完成! 文件保存在: {output_dir}")
return True
except Exception as e:
print(f"❌ 提取过程中出错: {e}")
return False
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("使用方法: python simple_extract.py <网站地址>")
sys.exit(1)
simple_extract(sys.argv[1])
文件存档
环境检测脚本 check_environment.py
这个脚本检查运行环境是否满足要求
# check_environment.py
import sys
import subprocess
import pkg_resources
def check_python_version():
"""检查Python版本"""
print("🔍 检查Python版本...")
version = sys.version_info
if version.major >= 3 and version.minor >= 6:
print(f"✅ Python版本: {sys.version}")
return True
else:
print(f"❌ Python版本过低: {sys.version}")
print(" 需要 Python 3.6 或更高版本")
return False
def check_dependencies():
"""检查必要的依赖包"""
print("\n🔍 检查依赖包...")
required_packages = ['requests', 'corpress', 'beautifulsoup4']
missing_packages = []
for package in required_packages:
try:
pkg_resources.require(package)
print(f"✅ {package}")
except:
missing_packages.append(package)
print(f"❌ {package}")
if missing_packages:
print(f"\n📦 需要安装缺失的包:")
print(f"pip install {' '.join(missing_packages)}")
return False
else:
print("✅ 所有依赖包已安装")
return True
def install_dependencies():
"""自动安装缺失的依赖包"""
print("\n📦 正在安装缺失的依赖包...")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "requests", "corpress", "beautifulsoup4"])
print("✅ 依赖包安装完成")
return True
except subprocess.CalledProcessError as e:
print(f"❌ 安装失败: {e}")
return False
def main():
print("=" * 50)
print("🛠️ WordPress内容提取 - 环境检测")
print("=" * 50)
# 检查Python版本
if not check_python_version():
sys.exit(1)
# 检查依赖包
if not check_dependencies():
if input("\n是否自动安装缺失的依赖包? (y/n): ").lower() == 'y':
if not install_dependencies():
sys.exit(1)
else:
print("请手动安装缺失的依赖包后重新运行")
sys.exit(1)
print("\n🎉 环境检测通过!可以运行后续脚本")
return True
if __name__ == "__main__":
main()
API测试脚本 test_wp_api.py
这个脚本专门测试WordPress REST API的连接状态和功能
# test_wp_api.py
import requests
import json
import sys
from urllib.parse import urljoin
def test_basic_api(site_url):
"""测试基本的API连接"""
print(f"\n🔗 测试API连接: {site_url}")
# 测试主要API端点
api_url = urljoin(site_url, "/wp-json/")
try:
response = requests.get(api_url, timeout=10)
if response.status_code == 200:
print("✅ REST API 连接成功!")
return True
else:
print(f"❌ API返回状态码: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 连接失败: {e}")
return False
def test_posts_api(site_url):
"""测试文章API端点"""
print(f"\n📝 测试文章API...")
posts_url = urljoin(site_url, "/wp-json/wp/v2/posts")
try:
response = requests.get(posts_url, timeout=10)
if response.status_code == 200:
posts_data = response.json()
print(f"✅ 文章API测试成功!")
print(f" 找到 {len(posts_data)} 篇文章")
if posts_data:
# 显示前3篇文章的标题
for i, post in enumerate(posts_data[:3]):
title = post.get('title', {}).get('rendered', '无标题')
# 清理HTML标签
import re
title = re.sub(r'<[^>]+>', '', title)
print(f" {i+1}. {title}")
return True
else:
print(f"❌ 文章API返回状态码: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 文章API测试失败: {e}")
return False
def test_api_details(site_url):
"""测试API详细信息"""
print(f"\n📊 获取API详细信息...")
api_url = urljoin(site_url, "/wp-json/")
try:
response = requests.get(api_url, timeout=10)
data = response.json()
print("✅ API信息:")
print(f" 名称: {data.get('name', '未知')}")
print(f" 描述: {data.get('description', '未知')}")
print(f" 主页: {data.get('home', '未知')}")
# 检查可用的端点
routes = data.get('routes', {})
print(f" 可用端点: {len(routes)} 个")
# 显示重要的端点
important_routes = [route for route in routes.keys() if any(keyword in route for keyword in ['posts', 'pages', 'media'])]
for route in important_routes[:5]: # 显示前5个重要端点
print(f" - {route}")
return True
except Exception as e:
print(f"❌ 获取API信息失败: {e}")
return False
def main():
if len(sys.argv) != 2:
print("使用方法: python test_wp_api.py <网站地址>")
print("示例: python test_wp_api.py https://example.com")
sys.exit(1)
site_url = sys.argv[1]
print("=" * 50)
print("🧪 WordPress REST API 测试工具")
print("=" * 50)
# 运行测试
tests_passed = 0
tests_total = 3
if test_basic_api(site_url):
tests_passed += 1
if test_posts_api(site_url):
tests_passed += 1
if test_api_details(site_url):
tests_passed += 1
# 测试总结
print("\n" + "=" * 50)
print("📋 测试总结")
print("=" * 50)
print(f"通过测试: {tests_passed}/{tests_total}")
if tests_passed == tests_total:
print("🎉 所有测试通过!可以开始提取内容")
return True
else:
print("❌ 部分测试失败,请检查WordPress REST API配置")
return False
if __name__ == "__main__":
main()
内容提取脚本 extract_content.py
这个脚本使用corpress包来提取内容
# extract_content.py
import sys
import os
import json
from datetime import datetime
from corpress.core import corpress
def create_output_structure(base_path):
"""创建输出目录结构"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = os.path.join(base_path, f"wordpress_export_{timestamp}")
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "raw_data"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "formatted"), exist_ok=True)
print(f"📁 创建输出目录: {output_dir}")
return output_dir
def extract_with_corpress(site_url, output_dir):
"""使用corpress提取内容"""
print(f"\n🚀 开始提取内容...")
print(f" 来源: {site_url}")
print(f" 目标: {output_dir}")
try:
# 提取内容到txt格式
print("📄 提取文本格式内容...")
corpress(site_url, save_path=output_dir, corpus_format='txt')
# 提取内容到csv格式(包含元数据)
print("📊 提取CSV格式内容(包含元数据)...")
corpress(site_url, save_path=output_dir, corpus_format='csv')
print("✅ 内容提取完成!")
return True
except Exception as e:
print(f"❌ 提取失败: {e}")
return False
def analyze_extracted_content(output_dir):
"""分析提取的内容"""
print(f"\n📊 分析提取的内容...")
# 统计文件
txt_files = [f for f in os.listdir(output_dir) if f.endswith('.txt')]
csv_files = [f for f in os.listdir(output_dir) if f.endswith('.csv')]
print(f" 文本文件: {len(txt_files)} 个")
print(f" CSV文件: {len(csv_files)} 个")
# 显示部分提取的文件
if txt_files:
print(f"\n 示例文本文件:")
for file in txt_files[:3]: # 显示前3个文件
file_path = os.path.join(output_dir, file)
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
preview = content[:100] + "..." if len(content) > 100 else content
print(f" - {file}: {preview}")
return len(txt_files) > 0
def main():
if len(sys.argv) != 2:
print("使用方法: python extract_content.py <网站地址>")
print("示例: python extract_content.py https://example.com")
sys.exit(1)
site_url = sys.argv[1]
base_output_path = "./wordpress_exports"
print("=" * 50)
print("📥 WordPress内容提取工具")
print("=" * 50)
# 创建输出目录
output_dir = create_output_structure(base_output_path)
# 提取内容
if extract_with_corpress(site_url, output_dir):
# 分析结果
if analyze_extracted_content(output_dir):
print(f"\n🎉 内容提取成功完成!")
print(f" 输出目录: {output_dir}")
return True
else:
print("❌ 内容提取但未找到有效文件")
return False
else:
print("❌ 内容提取失败")
return False
if __name__ == "__main__":
main()
内容格式化脚本 format_content.py
这个脚本对提取的内容进行进一步处理和格式化
# format_content.py
import os
import json
import csv
import re
from pathlib import Path
def clean_html_tags(text):
"""清理HTML标签"""
if not text:
return ""
# 移除HTML标签
clean = re.compile('<.*?>')
return re.sub(clean, '', text)
def format_for_ai(content):
"""格式化内容,使其更适合AI处理"""
# 清理HTML
clean_content = clean_html_tags(content)
# 移除多余的空白字符
clean_content = re.sub(r'\n\s*\n', '\n\n', clean_content)
clean_content = re.sub(r'[ \t]+', ' ', clean_content)
# 确保每段以换行符结束
clean_content = clean_content.strip()
return clean_content
def process_text_files(input_dir, output_dir):
"""处理文本文件"""
print("📄 处理文本文件...")
txt_files = [f for f in os.listdir(input_dir) if f.endswith('.txt')]
processed_count = 0
for txt_file in txt_files:
input_path = os.path.join(input_dir, txt_file)
output_path = os.path.join(output_dir, f"cleaned_{txt_file}")
try:
with open(input_path, 'r', encoding='utf-8') as f:
content = f.read()
# 格式化内容
formatted_content = format_for_ai(content)
# 保存格式化后的内容
with open(output_path, 'w', encoding='utf-8') as f:
f.write(formatted_content)
processed_count += 1
except Exception as e:
print(f"❌ 处理文件 {txt_file} 时出错: {e}")
print(f"✅ 已处理 {processed_count}/{len(txt_files)} 个文本文件")
return processed_count
def process_csv_files(input_dir, output_dir):
"""处理CSV文件"""
print("📊 处理CSV文件...")
csv_files = [f for f in os.listdir(input_dir) if f.endswith('.csv')]
for csv_file in csv_files:
input_path = os.path.join(input_dir, csv_file)
output_path = os.path.join(output_dir, f"summary_{csv_file}")
try:
with open(input_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
rows = list(reader)
print(f" {csv_file}: {len(rows)} 条记录")
# 创建简单的统计信息
stats = {
"total_records": len(rows),
"columns": list(rows[0].keys()) if rows else [],
"sample_record": rows[0] if rows else {}
}
# 保存统计信息
stats_path = os.path.join(output_dir, f"stats_{os.path.splitext(csv_file)[0]}.json")
with open(stats_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"❌ 处理CSV文件 {csv_file} 时出错: {e}")
return len(csv_files)
def create_content_summary(input_dir, output_dir):
"""创建内容摘要"""
print("\n📋 创建内容摘要...")
summary = {
"extraction_date": str(Path(input_dir).name),
"files": {},
"total_content_size": 0
}
# 统计所有文件
for root, dirs, files in os.walk(input_dir):
for file in files:
if file.endswith(('.txt', '.csv')):
file_path = os.path.join(root, file)
file_size = os.path.getsize(file_path)
rel_path = os.path.relpath(file_path, input_dir)
summary["files"][rel_path] = {
"size_bytes": file_size,
"size_kb": round(file_size / 1024, 2)
}
summary["total_content_size"] += file_size
# 保存摘要
summary_path = os.path.join(output_dir, "extraction_summary.json")
with open(summary_path, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
print(f"✅ 内容摘要已保存")
print(f" 总内容大小: {round(summary['total_content_size'] / 1024, 2)} KB")
print(f" 文件数量: {len(summary['files'])}")
return summary
def main():
if len(sys.argv) != 2:
print("使用方法: python format_content.py <提取内容目录>")
print("示例: python format_content.py ./wordpress_exports/wordpress_export_20240520_143022")
sys.exit(1)
input_dir = sys.argv[1]
if not os.path.exists(input_dir):
print(f"❌ 目录不存在: {input_dir}")
sys.exit(1)
print("=" * 50)
print("✨ 内容格式化工具")
print("=" * 50)
# 创建格式化输出目录
formatted_dir = os.path.join(input_dir, "formatted")
os.makedirs(formatted_dir, exist_ok=True)
# 处理各种文件
txt_count = process_text_files(input_dir, formatted_dir)
csv_count = process_csv_files(input_dir, formatted_dir)
# 创建摘要
summary = create_content_summary(input_dir, formatted_dir)
print(f"\n🎉 内容格式化完成!")
print(f" 格式化文件保存在: {formatted_dir}")
print(f" 处理文本文件: {txt_count} 个")
print(f" 处理CSV文件: {csv_count} 个")
if __name__ == "__main__":
main()
提取结果验证脚本 validate_extraction.py
这个脚本专门验证提取结果的质量和完整性。
# validate_extraction.py
import os
import json
import sys
from pathlib import Path
def validate_directory_structure(extract_dir):
"""验证目录结构是否符合预期"""
print("📁 验证目录结构...")
expected_dirs = ['raw_data', 'formatted']
expected_files = ['extraction_summary.json']
issues = []
# 检查基本目录是否存在
if not os.path.exists(extract_dir):
issues.append(f"❌ 提取目录不存在: {extract_dir}")
return False, issues
# 检查是否有必要的文件
has_txt_files = any(f.endswith('.txt') for f in os.listdir(extract_dir))
has_csv_files = any(f.endswith('.csv') for f in os.listdir(extract_dir))
if not has_txt_files and not has_csv_files:
issues.append("❌ 未找到任何提取的文本或CSV文件")
# 检查文件编码和可读性
file_issues = check_file_integrity(extract_dir)
issues.extend(file_issues)
if not issues:
print("✅ 目录结构验证通过")
return True, issues
else:
return False, issues
def check_file_integrity(extract_dir):
"""检查文件完整性和可读性"""
print("🔍 检查文件完整性...")
issues = []
for file in os.listdir(extract_dir):
if file.endswith(('.txt', '.csv')):
file_path = os.path.join(extract_dir, file)
try:
# 尝试读取文件
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查文件大小
file_size = len(content)
if file_size == 0:
issues.append(f"❌ 文件为空: {file}")
elif file_size < 10: # 太小可能有问题
issues.append(f"⚠️ 文件过小({file_size}字节): {file}")
# 检查内容质量
if file.endswith('.txt'):
content_issues = check_text_content(content, file)
issues.extend(content_issues)
except UnicodeDecodeError:
issues.append(f"❌ 文件编码问题: {file}")
except Exception as e:
issues.append(f"❌ 读取文件失败 {file}: {e}")
return issues
def check_text_content(content, filename):
"""检查文本内容质量"""
issues = []
# 检查是否包含有用的内容(不是只有HTML标签或空白)
text_only = remove_html_tags(content).strip()
if len(text_only) < 50 and len(content) > 100:
issues.append(f"⚠️ 可能包含大量HTML标签: {filename}")
# 检查是否有明显的提取问题
error_indicators = [
"undefined",
"error",
"exception",
"cannot read property",
"wp-json",
"rest_api"
]
for indicator in error_indicators:
if indicator.lower() in content.lower():
issues.append(f"⚠️ 可能包含错误信息({indicator}): {filename}")
return issues
def remove_html_tags(text):
"""移除HTML标签"""
import re
clean = re.compile('<.*?>')
return re.sub(clean, '', text)
def validate_content_structure(extract_dir):
"""验证内容结构"""
print("📊 验证内容结构...")
issues = []
# 统计文件类型和数量
file_types = {}
total_files = 0
for file in os.listdir(extract_dir):
ext = Path(file).suffix.lower()
file_types[ext] = file_types.get(ext, 0) + 1
total_files += 1
print(f" 文件统计: {file_types}")
if total_files == 0:
issues.append("❌ 没有找到任何提取的文件")
# 检查是否有足够的内容
txt_count = file_types.get('.txt', 0)
csv_count = file_types.get('.csv', 0)
if txt_count == 0 and csv_count == 0:
issues.append("❌ 没有找到文本或CSV文件")
return issues
def generate_validation_report(extract_dir, is_valid, issues):
"""生成验证报告"""
report = {
"validation_timestamp": str(Path(extract_dir).name),
"extract_directory": extract_dir,
"is_valid": is_valid,
"issues_found": len(issues),
"issues": issues,
"recommendation": ""
}
if is_valid:
report["recommendation"] = "✅ 验证通过,可以运行格式化脚本"
else:
report["recommendation"] = "❌ 请先解决上述问题再运行格式化脚本"
# 保存报告
report_path = os.path.join(extract_dir, "validation_report.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
return report
def main():
if len(sys.argv) != 2:
print("使用方法: python validate_extraction.py <提取内容目录>")
print("示例: python validate_extraction.py ./wordpress_exports/wordpress_export_20240520_143022")
sys.exit(1)
extract_dir = sys.argv[1]
print("=" * 50)
print("🔍 提取结果验证工具")
print("=" * 50)
# 运行验证
is_valid, structure_issues = validate_directory_structure(extract_dir)
content_issues = validate_content_structure(extract_dir)
all_issues = structure_issues + content_issues
is_overall_valid = is_valid and len(all_issues) == 0
# 生成报告
report = generate_validation_report(extract_dir, is_overall_valid, all_issues)
# 显示结果
print("\n" + "=" * 50)
print("📋 验证结果")
print("=" * 50)
if is_overall_valid:
print("🎉 所有验证通过!")
else:
print(f"❌ 发现 {len(all_issues)} 个问题:")
for issue in all_issues:
print(f" {issue}")
print(f"\n💡 建议: {report['recommendation']}")
print(f"📄 详细报告已保存: {os.path.join(extract_dir, 'validation_report.json')}")
return is_overall_valid
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)
集成测试脚本 run_integration_test.py
这个脚本模拟完整的工作流并验证每个步骤
# run_integration_test.py
import os
import sys
import subprocess
import tempfile
import shutil
from pathlib import Path
def run_command(command, description):
"""运行命令并检查结果"""
print(f"\n🔧 {description}")
print(f" 执行: {command}")
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode == 0:
print("✅ 执行成功")
return True, result.stdout
else:
print(f"❌ 执行失败 (退出码: {result.returncode})")
print(f" 错误输出: {result.stderr}")
return False, result.stderr
except Exception as e:
print(f"❌ 执行异常: {e}")
return False, str(e)
def test_environment():
"""测试环境配置"""
print("=" * 50)
print("1. 测试环境配置")
print("=" * 50)
return run_command("python check_environment.py", "检查Python环境")
def test_api_connection(test_site):
"""测试API连接"""
print("\n" + "=" * 50)
print("2. 测试API连接")
print("=" * 50)
return run_command(f"python test_wp_api.py {test_site}", "测试WordPress API连接")
def create_test_extraction(test_site, temp_dir):
"""创建测试提取"""
print("\n" + "=" * 50)
print("3. 测试内容提取")
print("=" * 50)
# 创建一个小的测试提取
success, output = run_command(
f"python extract_content.py {test_site}",
"提取WordPress内容"
)
if success:
# 找到最新的提取目录
export_dir = find_latest_export_dir()
return success, export_dir
else:
return success, None
def find_latest_export_dir():
"""找到最新的导出目录"""
base_dir = "./wordpress_exports"
if not os.path.exists(base_dir):
return None
dirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
if not dirs:
return None
# 按时间排序找到最新的
latest_dir = sorted(dirs)[-1]
return os.path.join(base_dir, latest_dir)
def validate_extraction(extract_dir):
"""验证提取结果"""
print("\n" + "=" * 50)
print("4. 验证提取结果")
print("=" * 50)
if not extract_dir or not os.path.exists(extract_dir):
print("❌ 没有找到提取目录")
return False, None
return run_command(
f"python validate_extraction.py {extract_dir}",
"验证提取内容质量"
)
def test_formatting(extract_dir):
"""测试格式化过程"""
print("\n" + "=" * 50)
print("5. 测试内容格式化")
print("=" * 50)
if not extract_dir:
return False, "没有有效的提取目录"
return run_command(
f"python format_content.py {extract_dir}",
"格式化提取的内容"
)
def generate_test_report(test_results, temp_dir):
"""生成测试报告"""
report = {
"test_timestamp": str(Path(temp_dir).name),
"overall_success": all(result[0] for result in test_results.values()),
"detailed_results": test_results,
"recommendations": []
}
# 根据测试结果生成建议
if not test_results["environment"][0]:
report["recommendations"].append("❌ 请先修复环境配置问题")
if not test_results["api"][0]:
report["recommendations"].append("❌ 请检查WordPress REST API配置")
if not test_results["extraction"][0]:
report["recommendations"].append("❌ 内容提取失败,请检查网站可访问性")
if not test_results["validation"][0]:
report["recommendations"].append("⚠️ 提取内容有质量问题,请检查验证报告")
if not test_results["formatting"][0]:
report["recommendations"].append("⚠️ 格式化过程有问题,请检查提取内容格式")
if report["overall_success"]:
report["recommendations"].append("🎉 所有测试通过!工作流可以正常运行")
# 保存报告
report_path = os.path.join(temp_dir, "integration_test_report.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
return report
def main():
if len(sys.argv) != 2:
print("使用方法: python run_integration_test.py <测试网站地址>")
print("示例: python run_integration_test.py https://example.com")
sys.exit(1)
test_site = sys.argv[1]
# 创建临时目录用于测试
with tempfile.TemporaryDirectory() as temp_dir:
print("=" * 50)
print("🧪 WordPress内容提取集成测试")
print("=" * 50)
print(f"测试网站: {test_site}")
print(f"工作目录: {temp_dir}")
# 保存原始工作目录
original_dir = os.getcwd()
try:
# 切换到临时目录
os.chdir(temp_dir)
# 复制脚本到临时目录(在实际使用中,这些脚本应该在当前目录)
# 这里假设脚本已经在当前目录可用
# 运行各个测试步骤
test_results = {}
test_results["environment"] = test_environment()
test_results["api"] = test_api_connection(test_site)
test_results["extraction"], extract_dir = create_test_extraction(test_site, temp_dir)
test_results["validation"] = validate_extraction(extract_dir)
test_results["formatting"] = test_formatting(extract_dir)
# 生成测试报告
report = generate_test_report(test_results, temp_dir)
# 显示最终结果
print("\n" + "=" * 50)
print("📋 集成测试完成")
print("=" * 50)
success_count = sum(1 for result in test_results.values() if result[0])
total_count = len(test_results)
print(f"测试通过: {success_count}/{total_count}")
if report["overall_success"]:
print("🎉 所有测试通过!您的工作流运行正常")
else:
print("❌ 部分测试失败,请检查以下问题:")
for recommendation in report["recommendations"]:
print(f" {recommendation}")
return report["overall_success"]
finally:
# 恢复原始工作目录
os.chdir(original_dir)
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)
新的三步法提取流程
在extract_content.py文件中,注意到一下子是提取出全部的文件,一下子提取文本量很大,能不能进行重新思路考虑?可以将发现的文章做成一个列表,进行返回,然后再进行补充参数,指定要提取的文章。先提取,后面再进行验证提取结果。
1. 发现阶段 (discovery_phase.py)
↓
2. 选择阶段 (selection_phase.py)
↓
3. 提取阶段 (extraction_phase.py)
脚本1:文章发现脚本 discovery_phase.py
这个脚本只获取文章列表和基本信息,不下载完整内容。
# discovery_phase.py
import requests
import json
import os
import sys
from datetime import datetime
from urllib.parse import urljoin
def discover_posts(site_url, max_posts=50):
"""
发现WordPress网站上的文章,只获取元数据
返回文章列表,不下载完整内容
"""
print(f"🔍 正在发现文章...")
print(f" 网站: {site_url}")
print(f" 最大文章数: {max_posts}")
posts_list = []
page = 1
per_page = min(20, max_posts)
while len(posts_list) < max_posts:
# 构建API URL
api_url = f"{site_url}/wp-json/wp/v2/posts"
params = {
'page': page,
'per_page': per_page,
'_fields': 'id,title,date,modified,slug,link,excerpt'
}
try:
response = requests.get(api_url, params=params, timeout=30)
if response.status_code != 200:
print(f"❌ API请求失败: {response.status_code}")
break
posts = response.json()
if not posts:
print("✅ 已获取所有可用文章")
break
for post in posts:
post_info = {
'id': post.get('id'),
'title': clean_html(post.get('title', {}).get('rendered', '无标题')),
'date': post.get('date'),
'modified': post.get('modified'),
'slug': post.get('slug'),
'link': post.get('link'),
'excerpt': clean_html(post.get('excerpt', {}).get('rendered', ''))[:200] + '...',
'selected': False # 用于后续选择
}
posts_list.append(post_info)
print(f" 已发现 {len(posts_list)} 篇文章...")
# 如果返回的文章数少于请求数,说明没有更多文章了
if len(posts) < per_page:
break
page += 1
except requests.exceptions.RequestException as e:
print(f"❌ 请求错误: {e}")
break
except json.JSONDecodeError as e:
print(f"❌ JSON解析错误: {e}")
break
return posts_list
def clean_html(text):
"""清理HTML标签"""
import re
if not text:
return ""
clean = re.compile('<.*?>')
return re.sub(clean, '', text)
def save_discovery_results(posts_list, output_dir, site_url): # 修复:添加 site_url 参数
"""保存发现结果"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
discovery_file = os.path.join(output_dir, f"discovery_results_{timestamp}.json")
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 保存完整发现结果
discovery_data = {
'discovery_time': datetime.now().isoformat(),
'total_posts': len(posts_list),
'site_url': site_url, # 使用传入的 site_url 参数
'posts': posts_list
}
with open(discovery_file, 'w', encoding='utf-8') as f:
json.dump(discovery_data, f, ensure_ascii=False, indent=2)
# 创建简化的选择文件(便于用户编辑)
selection_template = {
'site_url': site_url, # 使用传入的 site_url 参数
'discovery_file': discovery_file,
'selected_posts': [] # 用户将在这里添加要提取的文章ID
}
selection_file = os.path.join(output_dir, f"selection_template_{timestamp}.json")
with open(selection_file, 'w', encoding='utf-8') as f:
json.dump(selection_template, f, ensure_ascii=False, indent=2)
return discovery_file, selection_file
def display_posts_summary(posts_list):
"""显示文章摘要"""
print(f"\n📋 发现结果摘要")
print("=" * 50)
print(f"总文章数: {len(posts_list)}")
print("\n前10篇文章:")
print("-" * 50)
for i, post in enumerate(posts_list[:10]):
print(f"{i+1:2d}. ID: {post['id']} - {post['title']}")
print(f" 日期: {post['date']} | 链接: {post['link']}")
print(f" 摘要: {post['excerpt']}")
print()
if len(posts_list) > 10:
print(f"... 还有 {len(posts_list) - 10} 篇文章")
def main():
if len(sys.argv) < 2:
print("使用方法: python discovery_phase.py <网站地址> [最大文章数]")
print("示例: python discovery_phase.py https://example.com 100")
sys.exit(1)
site_url = sys.argv[1]
max_posts = int(sys.argv[2]) if len(sys.argv) > 2 else 50
print("=" * 50)
print("🔍 WordPress文章发现工具")
print("=" * 50)
# 发现文章
posts_list = discover_posts(site_url, max_posts)
if not posts_list:
print("❌ 没有发现任何文章")
sys.exit(1)
# 显示摘要
display_posts_summary(posts_list)
# 保存结果 - 修复:传递 site_url 参数
output_dir = "./discovery_results"
discovery_file, selection_file = save_discovery_results(posts_list, output_dir, site_url)
print(f"\n💾 发现结果已保存:")
print(f" 完整发现数据: {discovery_file}")
print(f" 选择模板文件: {selection_file}")
print(f"\n📝 下一步:")
print(f" 1. 编辑 {selection_file}")
print(f" 2. 在 'selected_posts' 数组中添加要提取的文章ID")
print(f" 3. 运行: python selection_phase.py {selection_file}")
if __name__ == "__main__":
main()
脚本2:文章选择脚本 selection_phase.py
这个脚本让用户基于发现结果选择要提取的文章
# selection_phase.py
import json
import os
import sys
from datetime import datetime
def load_discovery_results(discovery_file):
"""加载发现结果"""
try:
with open(discovery_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"❌ 加载发现结果失败: {e}")
return None
def interactive_selection(discovery_data):
"""交互式选择文章"""
posts = discovery_data['posts']
print("=" * 50)
print("📝 文章选择界面")
print("=" * 50)
print(f"总文章数: {len(posts)}")
print("\n请选择要提取的文章:")
print(" [A] 选择全部文章")
print(" [R] 按范围选择")
print(" [I] 按ID选择")
print(" [L] 列出所有文章")
print(" [Q] 完成选择")
selected_ids = []
while True:
choice = input("\n请选择操作 (A/R/I/L/Q): ").strip().upper()
if choice == 'A':
selected_ids = [post['id'] for post in posts]
print(f"✅ 已选择全部 {len(selected_ids)} 篇文章")
break
elif choice == 'R':
try:
start = int(input("起始序号 (1-{}): ".format(len(posts))))
end = int(input("结束序号 (1-{}): ".format(len(posts))))
if 1 <= start <= end <= len(posts):
selected_ids = [posts[i-1]['id'] for i in range(start, end+1)]
print(f"✅ 已选择文章 {start} 到 {end}")
break
else:
print("❌ 范围无效")
except ValueError:
print("❌ 请输入有效数字")
elif choice == 'I':
try:
ids_input = input("请输入文章ID(多个ID用逗号分隔): ")
selected_ids = [int(id_str.strip()) for id_str in ids_input.split(',')]
print(f"✅ 已选择 {len(selected_ids)} 篇文章")
break
except ValueError:
print("❌ 请输入有效的数字ID")
elif choice == 'L':
print("\n📋 所有文章列表:")
print("-" * 50)
for i, post in enumerate(posts, 1):
print(f"{i:3d}. ID: {post['id']} - {post['title']}")
elif choice == 'Q':
if selected_ids:
break
else:
print("❌ 请先选择文章")
else:
print("❌ 无效选择")
return selected_ids
def update_selection_file(selection_file, selected_ids, discovery_data):
"""更新选择文件"""
try:
with open(selection_file, 'r', encoding='utf-8') as f:
selection_data = json.load(f)
# 获取选中的文章详情
selected_posts = []
all_posts = {post['id']: post for post in discovery_data['posts']}
for post_id in selected_ids:
if post_id in all_posts:
selected_posts.append(all_posts[post_id])
selection_data['selected_posts'] = selected_posts
selection_data['selection_time'] = datetime.now().isoformat()
selection_data['total_selected'] = len(selected_posts)
# 保存更新后的选择文件
with open(selection_file, 'w', encoding='utf-8') as f:
json.dump(selection_data, f, ensure_ascii=False, indent=2)
return selection_data
except Exception as e:
print(f"❌ 更新选择文件失败: {e}")
return None
def main():
if len(sys.argv) != 2:
print("使用方法: python selection_phase.py <选择模板文件>")
print("示例: python selection_phase.py ./discovery_results/selection_template_20240520_143022.json")
sys.exit(1)
selection_file = sys.argv[1]
if not os.path.exists(selection_file):
print(f"❌ 选择文件不存在: {selection_file}")
sys.exit(1)
print("=" * 50)
print("📝 WordPress文章选择工具")
print("=" * 50)
# 加载选择文件
try:
with open(selection_file, 'r', encoding='utf-8') as f:
selection_data = json.load(f)
except Exception as e:
print(f"❌ 加载选择文件失败: {e}")
sys.exit(1)
discovery_file = selection_data.get('discovery_file')
if not discovery_file or not os.path.exists(discovery_file):
print(f"❌ 发现文件不存在: {discovery_file}")
sys.exit(1)
# 加载发现结果
discovery_data = load_discovery_results(discovery_file)
if not discovery_data:
sys.exit(1)
# 交互式选择
selected_ids = interactive_selection(discovery_data)
if not selected_ids:
print("❌ 没有选择任何文章")
sys.exit(1)
# 更新选择文件
updated_selection = update_selection_file(selection_file, selected_ids, discovery_data)
if updated_selection:
print(f"\n✅ 选择完成!")
print(f" 已选择 {len(selected_ids)} 篇文章")
print(f" 选择文件已更新: {selection_file}")
print(f"\n🚀 下一步:")
print(f" 运行: python extraction_phase.py {selection_file}")
else:
print("❌ 选择失败")
if __name__ == "__main__":
main()
脚本3:内容提取脚本 extraction_phase.py
# extraction_phase.py
import requests
import json
import os
import sys
from datetime import datetime
from urllib.parse import urljoin
import time
def load_selection_data(selection_file):
"""加载选择数据"""
try:
with open(selection_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"❌ 加载选择数据失败: {e}")
return None
def extract_post_content(site_url, post_id):
"""提取单篇文章的完整内容"""
api_url = f"{site_url}/wp-json/wp/v2/posts/{post_id}"
try:
response = requests.get(api_url, timeout=30)
if response.status_code == 200:
return response.json()
else:
print(f"❌ 提取文章 {post_id} 失败: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"❌ 请求文章 {post_id} 失败: {e}")
return None
def save_extracted_content(post_data, output_dir):
"""保存提取的内容"""
post_id = post_data.get('id')
post_slug = post_data.get('slug', f'post_{post_id}')
# 创建文章目录
post_dir = os.path.join(output_dir, f"post_{post_id}_{post_slug}")
os.makedirs(post_dir, exist_ok=True)
# 保存原始JSON数据
raw_file = os.path.join(post_dir, "raw_data.json")
with open(raw_file, 'w', encoding='utf-8') as f:
json.dump(post_data, f, ensure_ascii=False, indent=2)
# 提取并保存纯文本内容
title = clean_html(post_data.get('title', {}).get('rendered', '无标题'))
content = clean_html(post_data.get('content', {}).get('rendered', ''))
excerpt = clean_html(post_data.get('excerpt', {}).get('rendered', ''))
text_file = os.path.join(post_dir, "content.txt")
with open(text_file, 'w', encoding='utf-8') as f:
f.write(f"标题: {title}\n")
f.write(f"日期: {post_data.get('date')}\n")
f.write(f"修改时间: {post_data.get('modified')}\n")
f.write(f"链接: {post_data.get('link')}\n")
f.write(f"摘要: {excerpt}\n")
f.write("\n" + "="*50 + "\n")
f.write("正文内容:\n")
f.write("="*50 + "\n\n")
f.write(content)
# 创建文章元数据
meta = {
'extraction_time': datetime.now().isoformat(),
'post_id': post_id,
'post_slug': post_slug,
'title': title,
'date': post_data.get('date'),
'link': post_data.get('link'),
'content_length': len(content),
'word_count': len(content.split())
}
meta_file = os.path.join(post_dir, "metadata.json")
with open(meta_file, 'w', encoding='utf-8') as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
return {
'post_id': post_id,
'title': title,
'content_length': len(content),
'word_count': len(content.split()),
'output_dir': post_dir
}
def clean_html(text):
"""清理HTML标签"""
import re
if not text:
return ""
clean = re.compile('<.*?>')
text = re.sub(clean, '', text)
# 清理多余的空白字符
text = re.sub(r'\n\s*\n', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text)
return text.strip()
def create_extraction_summary(extraction_results, selection_data, output_dir):
"""创建提取摘要"""
summary = {
'extraction_time': datetime.now().isoformat(),
'site_url': selection_data.get('site_url'),
'total_selected': selection_data.get('total_selected', 0),
'successfully_extracted': len(extraction_results),
'failed_extractions': selection_data.get('total_selected', 0) - len(extraction_results),
'extraction_results': extraction_results,
'output_directory': output_dir
}
summary_file = os.path.join(output_dir, "extraction_summary.json")
with open(summary_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
return summary_file
def main():
if len(sys.argv) != 2:
print("使用方法: python extraction_phase.py <选择文件>")
print("示例: python extraction_phase.py ./discovery_results/selection_template_20240520_143022.json")
sys.exit(1)
selection_file = sys.argv[1]
print("=" * 50)
print("📥 WordPress内容提取工具")
print("=" * 50)
# 加载选择数据
selection_data = load_selection_data(selection_file)
if not selection_data:
sys.exit(1)
site_url = selection_data.get('site_url')
selected_posts = selection_data.get('selected_posts', [])
if not selected_posts:
print("❌ 没有选择任何文章")
sys.exit(1)
print(f"🌐 网站: {site_url}")
print(f"📝 准备提取 {len(selected_posts)} 篇文章")
# 创建输出目录
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = f"./extracted_content_{timestamp}"
os.makedirs(output_dir, exist_ok=True)
extraction_results = []
failed_extractions = []
# 逐篇提取文章
for i, post_info in enumerate(selected_posts, 1):
post_id = post_info['id']
post_title = post_info['title']
print(f"\n[{i}/{len(selected_posts)}] 提取文章: {post_title}")
# 提取内容
post_data = extract_post_content(site_url, post_id)
if post_data:
# 保存内容
result = save_extracted_content(post_data, output_dir)
extraction_results.append(result)
print(f" ✅ 提取成功: {result['word_count']} 字")
else:
failed_extractions.append(post_id)
print(f" ❌ 提取失败")
# 添加短暂延迟,避免对服务器造成压力
time.sleep(0.5)
# 创建提取摘要
summary_file = create_extraction_summary(extraction_results, selection_data, output_dir)
# 显示结果摘要
print(f"\n" + "=" * 50)
print("📊 提取结果摘要")
print("=" * 50)
print(f"✅ 成功提取: {len(extraction_results)} 篇")
print(f"❌ 提取失败: {len(failed_extractions)} 篇")
print(f"📁 输出目录: {output_dir}")
print(f"📄 摘要文件: {summary_file}")
if failed_extractions:
print(f"\n失败的文章ID: {failed_extractions}")
print(f"\n🎉 提取完成!")
if __name__ == "__main__":
main()
新的使用流程
第一步:发现文章
python discovery_phase.py https://你的网站地址 100
第二步:选择文章
python selection_phase.py ./discovery_results/selection_template_时间戳.json
第三步:提取内容
python extraction_phase.py ./discovery_results/selection_template_时间戳.json



