WordPress文章提取(1)
本文最后更新于71 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com
脚本名称主要功能当前状态备注
check_environment.py检查Python环境和依赖包完成基础环境验证
test_wp_api.py测试WordPress REST API连接完成验证API可用性
extract_content.py使用corpress包提取内容 需要验证核心提取功能
validate_extraction.py验证提取结果的质量需要验证质量保证
format_content.py清理和优化提取的内容 需要验证后处理
run_integration_test.py集成测试完整工作流需要验证端到端测试

脚本依赖关系

check_environment.py 
    ↓ (环境就绪)
test_wp_api.py
    ↓ (API可用)  
extract_content.py
    ↓ (内容提取成功)
validate_extraction.py
    ↓ (质量验证通过)
format_content.py

当前主要问题总结

  1. WordPress REST API 配置问题
    • 网站可能没有正确启用REST API
    • 访问 /wp-json/ 返回乱码而不是结构化的JSON
    • 这会影响所有基于API的提取脚本
  2. 脚本间兼容性问题
    • 提取脚本的输出格式可能与验证/格式化脚本的预期输入不匹配
    • 缺少统一的错误处理和数据传递机制
  3. 缺少渐进式测试方法
    • 没有从简单到复杂的测试策略
    • 难以定位具体哪个环节出现问题

重新设计的简化工作流

阶段1:基础验证

python
# 第一步:验证环境
python check_environment.py

# 第二步:手动验证API
# 在浏览器中访问: https://你的网站/wp-json/
# 确认返回的是JSON而不是乱码

阶段2:分步测试

python
# 创建简单的测试脚本 test_basic.py
import requests
import json

def test_basic():
    url = "https://你的网站/wp-json/wp/v2/posts"
    try:
        response = requests.get(url)
        print(f"状态码: {response.status_code}")
        print(f"内容类型: {response.headers.get('content-type')}")
        
        if response.status_code == 200:
            data = response.json()
            print(f"找到 {len(data)} 篇文章")
            return True
        else:
            print(f"响应内容: {response.text[:200]}")  # 显示前200字符
            return False
    except Exception as e:
        print(f"错误: {e}")
        return False

if __name__ == "__main__":
    test_basic()

阶段3:简化提取流程

如果API工作正常,我们可以简化提取流程:

# simple_extract.py - 简化版提取脚本
import requests
import os
import json
from datetime import datetime

def simple_extract(site_url, output_dir="./simple_output"):
    """简化版内容提取"""
    
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 获取文章列表
    posts_url = f"{site_url}/wp-json/wp/v2/posts"
    
    try:
        response = requests.get(posts_url)
        if response.status_code != 200:
            print(f"❌ 获取文章失败: {response.status_code}")
            return False
        
        posts = response.json()
        print(f"✅ 找到 {len(posts)} 篇文章")
        
        # 提取每篇文章
        for i, post in enumerate(posts):
            title = post.get('title', {}).get('rendered', f'文章_{i}')
            content = post.get('content', {}).get('rendered', '')
            
            # 简单的文件名
            filename = f"post_{i+1}.txt"
            filepath = os.path.join(output_dir, filename)
            
            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(f"标题: {title}\n\n")
                f.write(f"内容: {content}\n")
            
            print(f"📄 保存: {filename}")
        
        # 保存元数据
        meta = {
            "extracted_at": datetime.now().isoformat(),
            "total_posts": len(posts),
            "site_url": site_url
        }
        
        with open(os.path.join(output_dir, "metadata.json"), 'w', encoding='utf-8') as f:
            json.dump(meta, f, ensure_ascii=False, indent=2)
        
        print(f"🎉 提取完成! 文件保存在: {output_dir}")
        return True
        
    except Exception as e:
        print(f"❌ 提取过程中出错: {e}")
        return False

if __name__ == "__main__":
    import sys
    if len(sys.argv) != 2:
        print("使用方法: python simple_extract.py <网站地址>")
        sys.exit(1)
    
    simple_extract(sys.argv[1])

文件存档

环境检测脚本 check_environment.py

这个脚本检查运行环境是否满足要求

# check_environment.py
import sys
import subprocess
import pkg_resources

def check_python_version():
    """检查Python版本"""
    print("🔍 检查Python版本...")
    version = sys.version_info
    if version.major >= 3 and version.minor >= 6:
        print(f"✅ Python版本: {sys.version}")
        return True
    else:
        print(f"❌ Python版本过低: {sys.version}")
        print("   需要 Python 3.6 或更高版本")
        return False

def check_dependencies():
    """检查必要的依赖包"""
    print("\n🔍 检查依赖包...")
    required_packages = ['requests', 'corpress', 'beautifulsoup4']
    missing_packages = []
    
    for package in required_packages:
        try:
            pkg_resources.require(package)
            print(f"✅ {package}")
        except:
            missing_packages.append(package)
            print(f"❌ {package}")
    
    if missing_packages:
        print(f"\n📦 需要安装缺失的包:")
        print(f"pip install {' '.join(missing_packages)}")
        return False
    else:
        print("✅ 所有依赖包已安装")
        return True

def install_dependencies():
    """自动安装缺失的依赖包"""
    print("\n📦 正在安装缺失的依赖包...")
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "requests", "corpress", "beautifulsoup4"])
        print("✅ 依赖包安装完成")
        return True
    except subprocess.CalledProcessError as e:
        print(f"❌ 安装失败: {e}")
        return False

def main():
    print("=" * 50)
    print("🛠️  WordPress内容提取 - 环境检测")
    print("=" * 50)
    
    # 检查Python版本
    if not check_python_version():
        sys.exit(1)
    
    # 检查依赖包
    if not check_dependencies():
        if input("\n是否自动安装缺失的依赖包? (y/n): ").lower() == 'y':
            if not install_dependencies():
                sys.exit(1)
        else:
            print("请手动安装缺失的依赖包后重新运行")
            sys.exit(1)
    
    print("\n🎉 环境检测通过!可以运行后续脚本")
    return True

if __name__ == "__main__":
    main()

API测试脚本 test_wp_api.py

这个脚本专门测试WordPress REST API的连接状态和功能

# test_wp_api.py
import requests
import json
import sys
from urllib.parse import urljoin

def test_basic_api(site_url):
    """测试基本的API连接"""
    print(f"\n🔗 测试API连接: {site_url}")
    
    # 测试主要API端点
    api_url = urljoin(site_url, "/wp-json/")
    try:
        response = requests.get(api_url, timeout=10)
        if response.status_code == 200:
            print("✅ REST API 连接成功!")
            return True
        else:
            print(f"❌ API返回状态码: {response.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        print(f"❌ 连接失败: {e}")
        return False

def test_posts_api(site_url):
    """测试文章API端点"""
    print(f"\n📝 测试文章API...")
    
    posts_url = urljoin(site_url, "/wp-json/wp/v2/posts")
    try:
        response = requests.get(posts_url, timeout=10)
        if response.status_code == 200:
            posts_data = response.json()
            print(f"✅ 文章API测试成功!")
            print(f"   找到 {len(posts_data)} 篇文章")
            
            if posts_data:
                # 显示前3篇文章的标题
                for i, post in enumerate(posts_data[:3]):
                    title = post.get('title', {}).get('rendered', '无标题')
                    # 清理HTML标签
                    import re
                    title = re.sub(r'<[^>]+>', '', title)
                    print(f"   {i+1}. {title}")
            return True
        else:
            print(f"❌ 文章API返回状态码: {response.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        print(f"❌ 文章API测试失败: {e}")
        return False

def test_api_details(site_url):
    """测试API详细信息"""
    print(f"\n📊 获取API详细信息...")
    
    api_url = urljoin(site_url, "/wp-json/")
    try:
        response = requests.get(api_url, timeout=10)
        data = response.json()
        
        print("✅ API信息:")
        print(f"   名称: {data.get('name', '未知')}")
        print(f"   描述: {data.get('description', '未知')}")
        print(f"   主页: {data.get('home', '未知')}")
        
        # 检查可用的端点
        routes = data.get('routes', {})
        print(f"   可用端点: {len(routes)} 个")
        
        # 显示重要的端点
        important_routes = [route for route in routes.keys() if any(keyword in route for keyword in ['posts', 'pages', 'media'])]
        for route in important_routes[:5]:  # 显示前5个重要端点
            print(f"     - {route}")
            
        return True
    except Exception as e:
        print(f"❌ 获取API信息失败: {e}")
        return False

def main():
    if len(sys.argv) != 2:
        print("使用方法: python test_wp_api.py <网站地址>")
        print("示例: python test_wp_api.py https://example.com")
        sys.exit(1)
    
    site_url = sys.argv[1]
    
    print("=" * 50)
    print("🧪 WordPress REST API 测试工具")
    print("=" * 50)
    
    # 运行测试
    tests_passed = 0
    tests_total = 3
    
    if test_basic_api(site_url):
        tests_passed += 1
    
    if test_posts_api(site_url):
        tests_passed += 1
    
    if test_api_details(site_url):
        tests_passed += 1
    
    # 测试总结
    print("\n" + "=" * 50)
    print("📋 测试总结")
    print("=" * 50)
    print(f"通过测试: {tests_passed}/{tests_total}")
    
    if tests_passed == tests_total:
        print("🎉 所有测试通过!可以开始提取内容")
        return True
    else:
        print("❌ 部分测试失败,请检查WordPress REST API配置")
        return False

if __name__ == "__main__":
    main()

内容提取脚本 extract_content.py

这个脚本使用corpress包来提取内容

# extract_content.py
import sys
import os
import json
from datetime import datetime
from corpress.core import corpress

def create_output_structure(base_path):
    """创建输出目录结构"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir = os.path.join(base_path, f"wordpress_export_{timestamp}")
    
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(os.path.join(output_dir, "raw_data"), exist_ok=True)
    os.makedirs(os.path.join(output_dir, "formatted"), exist_ok=True)
    
    print(f"📁 创建输出目录: {output_dir}")
    return output_dir

def extract_with_corpress(site_url, output_dir):
    """使用corpress提取内容"""
    print(f"\n🚀 开始提取内容...")
    print(f"   来源: {site_url}")
    print(f"   目标: {output_dir}")
    
    try:
        # 提取内容到txt格式
        print("📄 提取文本格式内容...")
        corpress(site_url, save_path=output_dir, corpus_format='txt')
        
        # 提取内容到csv格式(包含元数据)
        print("📊 提取CSV格式内容(包含元数据)...")
        corpress(site_url, save_path=output_dir, corpus_format='csv')
        
        print("✅ 内容提取完成!")
        return True
        
    except Exception as e:
        print(f"❌ 提取失败: {e}")
        return False

def analyze_extracted_content(output_dir):
    """分析提取的内容"""
    print(f"\n📊 分析提取的内容...")
    
    # 统计文件
    txt_files = [f for f in os.listdir(output_dir) if f.endswith('.txt')]
    csv_files = [f for f in os.listdir(output_dir) if f.endswith('.csv')]
    
    print(f"   文本文件: {len(txt_files)} 个")
    print(f"   CSV文件: {len(csv_files)} 个")
    
    # 显示部分提取的文件
    if txt_files:
        print(f"\n   示例文本文件:")
        for file in txt_files[:3]:  # 显示前3个文件
            file_path = os.path.join(output_dir, file)
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                preview = content[:100] + "..." if len(content) > 100 else content
                print(f"     - {file}: {preview}")
    
    return len(txt_files) > 0

def main():
    if len(sys.argv) != 2:
        print("使用方法: python extract_content.py <网站地址>")
        print("示例: python extract_content.py https://example.com")
        sys.exit(1)
    
    site_url = sys.argv[1]
    base_output_path = "./wordpress_exports"
    
    print("=" * 50)
    print("📥 WordPress内容提取工具")
    print("=" * 50)
    
    # 创建输出目录
    output_dir = create_output_structure(base_output_path)
    
    # 提取内容
    if extract_with_corpress(site_url, output_dir):
        # 分析结果
        if analyze_extracted_content(output_dir):
            print(f"\n🎉 内容提取成功完成!")
            print(f"   输出目录: {output_dir}")
            return True
        else:
            print("❌ 内容提取但未找到有效文件")
            return False
    else:
        print("❌ 内容提取失败")
        return False

if __name__ == "__main__":
    main()

内容格式化脚本 format_content.py

这个脚本对提取的内容进行进一步处理和格式化

# format_content.py
import os
import json
import csv
import re
from pathlib import Path

def clean_html_tags(text):
    """清理HTML标签"""
    if not text:
        return ""
    # 移除HTML标签
    clean = re.compile('<.*?>')
    return re.sub(clean, '', text)

def format_for_ai(content):
    """格式化内容,使其更适合AI处理"""
    # 清理HTML
    clean_content = clean_html_tags(content)
    
    # 移除多余的空白字符
    clean_content = re.sub(r'\n\s*\n', '\n\n', clean_content)
    clean_content = re.sub(r'[ \t]+', ' ', clean_content)
    
    # 确保每段以换行符结束
    clean_content = clean_content.strip()
    
    return clean_content

def process_text_files(input_dir, output_dir):
    """处理文本文件"""
    print("📄 处理文本文件...")
    
    txt_files = [f for f in os.listdir(input_dir) if f.endswith('.txt')]
    processed_count = 0
    
    for txt_file in txt_files:
        input_path = os.path.join(input_dir, txt_file)
        output_path = os.path.join(output_dir, f"cleaned_{txt_file}")
        
        try:
            with open(input_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # 格式化内容
            formatted_content = format_for_ai(content)
            
            # 保存格式化后的内容
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write(formatted_content)
            
            processed_count += 1
            
        except Exception as e:
            print(f"❌ 处理文件 {txt_file} 时出错: {e}")
    
    print(f"✅ 已处理 {processed_count}/{len(txt_files)} 个文本文件")
    return processed_count

def process_csv_files(input_dir, output_dir):
    """处理CSV文件"""
    print("📊 处理CSV文件...")
    
    csv_files = [f for f in os.listdir(input_dir) if f.endswith('.csv')]
    
    for csv_file in csv_files:
        input_path = os.path.join(input_dir, csv_file)
        output_path = os.path.join(output_dir, f"summary_{csv_file}")
        
        try:
            with open(input_path, 'r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                rows = list(reader)
            
            print(f"   {csv_file}: {len(rows)} 条记录")
            
            # 创建简单的统计信息
            stats = {
                "total_records": len(rows),
                "columns": list(rows[0].keys()) if rows else [],
                "sample_record": rows[0] if rows else {}
            }
            
            # 保存统计信息
            stats_path = os.path.join(output_dir, f"stats_{os.path.splitext(csv_file)[0]}.json")
            with open(stats_path, 'w', encoding='utf-8') as f:
                json.dump(stats, f, ensure_ascii=False, indent=2)
            
        except Exception as e:
            print(f"❌ 处理CSV文件 {csv_file} 时出错: {e}")
    
    return len(csv_files)

def create_content_summary(input_dir, output_dir):
    """创建内容摘要"""
    print("\n📋 创建内容摘要...")
    
    summary = {
        "extraction_date": str(Path(input_dir).name),
        "files": {},
        "total_content_size": 0
    }
    
    # 统计所有文件
    for root, dirs, files in os.walk(input_dir):
        for file in files:
            if file.endswith(('.txt', '.csv')):
                file_path = os.path.join(root, file)
                file_size = os.path.getsize(file_path)
                
                rel_path = os.path.relpath(file_path, input_dir)
                summary["files"][rel_path] = {
                    "size_bytes": file_size,
                    "size_kb": round(file_size / 1024, 2)
                }
                summary["total_content_size"] += file_size
    
    # 保存摘要
    summary_path = os.path.join(output_dir, "extraction_summary.json")
    with open(summary_path, 'w', encoding='utf-8') as f:
        json.dump(summary, f, ensure_ascii=False, indent=2)
    
    print(f"✅ 内容摘要已保存")
    print(f"   总内容大小: {round(summary['total_content_size'] / 1024, 2)} KB")
    print(f"   文件数量: {len(summary['files'])}")
    
    return summary

def main():
    if len(sys.argv) != 2:
        print("使用方法: python format_content.py <提取内容目录>")
        print("示例: python format_content.py ./wordpress_exports/wordpress_export_20240520_143022")
        sys.exit(1)
    
    input_dir = sys.argv[1]
    
    if not os.path.exists(input_dir):
        print(f"❌ 目录不存在: {input_dir}")
        sys.exit(1)
    
    print("=" * 50)
    print("✨ 内容格式化工具")
    print("=" * 50)
    
    # 创建格式化输出目录
    formatted_dir = os.path.join(input_dir, "formatted")
    os.makedirs(formatted_dir, exist_ok=True)
    
    # 处理各种文件
    txt_count = process_text_files(input_dir, formatted_dir)
    csv_count = process_csv_files(input_dir, formatted_dir)
    
    # 创建摘要
    summary = create_content_summary(input_dir, formatted_dir)
    
    print(f"\n🎉 内容格式化完成!")
    print(f"   格式化文件保存在: {formatted_dir}")
    print(f"   处理文本文件: {txt_count} 个")
    print(f"   处理CSV文件: {csv_count} 个")

if __name__ == "__main__":
    main()

提取结果验证脚本 validate_extraction.py

这个脚本专门验证提取结果的质量和完整性。

# validate_extraction.py
import os
import json
import sys
from pathlib import Path

def validate_directory_structure(extract_dir):
    """验证目录结构是否符合预期"""
    print("📁 验证目录结构...")
    
    expected_dirs = ['raw_data', 'formatted']
    expected_files = ['extraction_summary.json']
    
    issues = []
    
    # 检查基本目录是否存在
    if not os.path.exists(extract_dir):
        issues.append(f"❌ 提取目录不存在: {extract_dir}")
        return False, issues
    
    # 检查是否有必要的文件
    has_txt_files = any(f.endswith('.txt') for f in os.listdir(extract_dir))
    has_csv_files = any(f.endswith('.csv') for f in os.listdir(extract_dir))
    
    if not has_txt_files and not has_csv_files:
        issues.append("❌ 未找到任何提取的文本或CSV文件")
    
    # 检查文件编码和可读性
    file_issues = check_file_integrity(extract_dir)
    issues.extend(file_issues)
    
    if not issues:
        print("✅ 目录结构验证通过")
        return True, issues
    else:
        return False, issues

def check_file_integrity(extract_dir):
    """检查文件完整性和可读性"""
    print("🔍 检查文件完整性...")
    
    issues = []
    
    for file in os.listdir(extract_dir):
        if file.endswith(('.txt', '.csv')):
            file_path = os.path.join(extract_dir, file)
            
            try:
                # 尝试读取文件
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                # 检查文件大小
                file_size = len(content)
                if file_size == 0:
                    issues.append(f"❌ 文件为空: {file}")
                elif file_size < 10:  # 太小可能有问题
                    issues.append(f"⚠️  文件过小({file_size}字节): {file}")
                
                # 检查内容质量
                if file.endswith('.txt'):
                    content_issues = check_text_content(content, file)
                    issues.extend(content_issues)
                    
            except UnicodeDecodeError:
                issues.append(f"❌ 文件编码问题: {file}")
            except Exception as e:
                issues.append(f"❌ 读取文件失败 {file}: {e}")
    
    return issues

def check_text_content(content, filename):
    """检查文本内容质量"""
    issues = []
    
    # 检查是否包含有用的内容(不是只有HTML标签或空白)
    text_only = remove_html_tags(content).strip()
    
    if len(text_only) < 50 and len(content) > 100:
        issues.append(f"⚠️  可能包含大量HTML标签: {filename}")
    
    # 检查是否有明显的提取问题
    error_indicators = [
        "undefined",
        "error",
        "exception",
        "cannot read property",
        "wp-json",
        "rest_api"
    ]
    
    for indicator in error_indicators:
        if indicator.lower() in content.lower():
            issues.append(f"⚠️  可能包含错误信息({indicator}): {filename}")
    
    return issues

def remove_html_tags(text):
    """移除HTML标签"""
    import re
    clean = re.compile('<.*?>')
    return re.sub(clean, '', text)

def validate_content_structure(extract_dir):
    """验证内容结构"""
    print("📊 验证内容结构...")
    
    issues = []
    
    # 统计文件类型和数量
    file_types = {}
    total_files = 0
    
    for file in os.listdir(extract_dir):
        ext = Path(file).suffix.lower()
        file_types[ext] = file_types.get(ext, 0) + 1
        total_files += 1
    
    print(f"   文件统计: {file_types}")
    
    if total_files == 0:
        issues.append("❌ 没有找到任何提取的文件")
    
    # 检查是否有足够的内容
    txt_count = file_types.get('.txt', 0)
    csv_count = file_types.get('.csv', 0)
    
    if txt_count == 0 and csv_count == 0:
        issues.append("❌ 没有找到文本或CSV文件")
    
    return issues

def generate_validation_report(extract_dir, is_valid, issues):
    """生成验证报告"""
    report = {
        "validation_timestamp": str(Path(extract_dir).name),
        "extract_directory": extract_dir,
        "is_valid": is_valid,
        "issues_found": len(issues),
        "issues": issues,
        "recommendation": ""
    }
    
    if is_valid:
        report["recommendation"] = "✅ 验证通过,可以运行格式化脚本"
    else:
        report["recommendation"] = "❌ 请先解决上述问题再运行格式化脚本"
    
    # 保存报告
    report_path = os.path.join(extract_dir, "validation_report.json")
    with open(report_path, 'w', encoding='utf-8') as f:
        json.dump(report, f, ensure_ascii=False, indent=2)
    
    return report

def main():
    if len(sys.argv) != 2:
        print("使用方法: python validate_extraction.py <提取内容目录>")
        print("示例: python validate_extraction.py ./wordpress_exports/wordpress_export_20240520_143022")
        sys.exit(1)
    
    extract_dir = sys.argv[1]
    
    print("=" * 50)
    print("🔍 提取结果验证工具")
    print("=" * 50)
    
    # 运行验证
    is_valid, structure_issues = validate_directory_structure(extract_dir)
    content_issues = validate_content_structure(extract_dir)
    
    all_issues = structure_issues + content_issues
    is_overall_valid = is_valid and len(all_issues) == 0
    
    # 生成报告
    report = generate_validation_report(extract_dir, is_overall_valid, all_issues)
    
    # 显示结果
    print("\n" + "=" * 50)
    print("📋 验证结果")
    print("=" * 50)
    
    if is_overall_valid:
        print("🎉 所有验证通过!")
    else:
        print(f"❌ 发现 {len(all_issues)} 个问题:")
        for issue in all_issues:
            print(f"   {issue}")
    
    print(f"\n💡 建议: {report['recommendation']}")
    print(f"📄 详细报告已保存: {os.path.join(extract_dir, 'validation_report.json')}")
    
    return is_overall_valid

if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)

集成测试脚本 run_integration_test.py

这个脚本模拟完整的工作流并验证每个步骤

# run_integration_test.py
import os
import sys
import subprocess
import tempfile
import shutil
from pathlib import Path

def run_command(command, description):
    """运行命令并检查结果"""
    print(f"\n🔧 {description}")
    print(f"   执行: {command}")
    
    try:
        result = subprocess.run(command, shell=True, capture_output=True, text=True)
        
        if result.returncode == 0:
            print("✅ 执行成功")
            return True, result.stdout
        else:
            print(f"❌ 执行失败 (退出码: {result.returncode})")
            print(f"   错误输出: {result.stderr}")
            return False, result.stderr
    except Exception as e:
        print(f"❌ 执行异常: {e}")
        return False, str(e)

def test_environment():
    """测试环境配置"""
    print("=" * 50)
    print("1. 测试环境配置")
    print("=" * 50)
    
    return run_command("python check_environment.py", "检查Python环境")

def test_api_connection(test_site):
    """测试API连接"""
    print("\n" + "=" * 50)
    print("2. 测试API连接")
    print("=" * 50)
    
    return run_command(f"python test_wp_api.py {test_site}", "测试WordPress API连接")

def create_test_extraction(test_site, temp_dir):
    """创建测试提取"""
    print("\n" + "=" * 50)
    print("3. 测试内容提取")
    print("=" * 50)
    
    # 创建一个小的测试提取
    success, output = run_command(
        f"python extract_content.py {test_site}", 
        "提取WordPress内容"
    )
    
    if success:
        # 找到最新的提取目录
        export_dir = find_latest_export_dir()
        return success, export_dir
    else:
        return success, None

def find_latest_export_dir():
    """找到最新的导出目录"""
    base_dir = "./wordpress_exports"
    if not os.path.exists(base_dir):
        return None
    
    dirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
    if not dirs:
        return None
    
    # 按时间排序找到最新的
    latest_dir = sorted(dirs)[-1]
    return os.path.join(base_dir, latest_dir)

def validate_extraction(extract_dir):
    """验证提取结果"""
    print("\n" + "=" * 50)
    print("4. 验证提取结果")
    print("=" * 50)
    
    if not extract_dir or not os.path.exists(extract_dir):
        print("❌ 没有找到提取目录")
        return False, None
    
    return run_command(
        f"python validate_extraction.py {extract_dir}", 
        "验证提取内容质量"
    )

def test_formatting(extract_dir):
    """测试格式化过程"""
    print("\n" + "=" * 50)
    print("5. 测试内容格式化")
    print("=" * 50)
    
    if not extract_dir:
        return False, "没有有效的提取目录"
    
    return run_command(
        f"python format_content.py {extract_dir}", 
        "格式化提取的内容"
    )

def generate_test_report(test_results, temp_dir):
    """生成测试报告"""
    report = {
        "test_timestamp": str(Path(temp_dir).name),
        "overall_success": all(result[0] for result in test_results.values()),
        "detailed_results": test_results,
        "recommendations": []
    }
    
    # 根据测试结果生成建议
    if not test_results["environment"][0]:
        report["recommendations"].append("❌ 请先修复环境配置问题")
    if not test_results["api"][0]:
        report["recommendations"].append("❌ 请检查WordPress REST API配置")
    if not test_results["extraction"][0]:
        report["recommendations"].append("❌ 内容提取失败,请检查网站可访问性")
    if not test_results["validation"][0]:
        report["recommendations"].append("⚠️  提取内容有质量问题,请检查验证报告")
    if not test_results["formatting"][0]:
        report["recommendations"].append("⚠️  格式化过程有问题,请检查提取内容格式")
    
    if report["overall_success"]:
        report["recommendations"].append("🎉 所有测试通过!工作流可以正常运行")
    
    # 保存报告
    report_path = os.path.join(temp_dir, "integration_test_report.json")
    with open(report_path, 'w', encoding='utf-8') as f:
        json.dump(report, f, ensure_ascii=False, indent=2)
    
    return report

def main():
    if len(sys.argv) != 2:
        print("使用方法: python run_integration_test.py <测试网站地址>")
        print("示例: python run_integration_test.py https://example.com")
        sys.exit(1)
    
    test_site = sys.argv[1]
    
    # 创建临时目录用于测试
    with tempfile.TemporaryDirectory() as temp_dir:
        print("=" * 50)
        print("🧪 WordPress内容提取集成测试")
        print("=" * 50)
        print(f"测试网站: {test_site}")
        print(f"工作目录: {temp_dir}")
        
        # 保存原始工作目录
        original_dir = os.getcwd()
        
        try:
            # 切换到临时目录
            os.chdir(temp_dir)
            
            # 复制脚本到临时目录(在实际使用中,这些脚本应该在当前目录)
            # 这里假设脚本已经在当前目录可用
            
            # 运行各个测试步骤
            test_results = {}
            
            test_results["environment"] = test_environment()
            test_results["api"] = test_api_connection(test_site)
            test_results["extraction"], extract_dir = create_test_extraction(test_site, temp_dir)
            test_results["validation"] = validate_extraction(extract_dir)
            test_results["formatting"] = test_formatting(extract_dir)
            
            # 生成测试报告
            report = generate_test_report(test_results, temp_dir)
            
            # 显示最终结果
            print("\n" + "=" * 50)
            print("📋 集成测试完成")
            print("=" * 50)
            
            success_count = sum(1 for result in test_results.values() if result[0])
            total_count = len(test_results)
            
            print(f"测试通过: {success_count}/{total_count}")
            
            if report["overall_success"]:
                print("🎉 所有测试通过!您的工作流运行正常")
            else:
                print("❌ 部分测试失败,请检查以下问题:")
                for recommendation in report["recommendations"]:
                    print(f"   {recommendation}")
            
            return report["overall_success"]
            
        finally:
            # 恢复原始工作目录
            os.chdir(original_dir)

if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)

新的三步法提取流程

在extract_content.py文件中,注意到一下子是提取出全部的文件,一下子提取文本量很大,能不能进行重新思路考虑?可以将发现的文章做成一个列表,进行返回,然后再进行补充参数,指定要提取的文章。先提取,后面再进行验证提取结果。

1. 发现阶段 (discovery_phase.py)
   ↓
2. 选择阶段 (selection_phase.py) 
   ↓
3. 提取阶段 (extraction_phase.py)

脚本1:文章发现脚本 discovery_phase.py

这个脚本只获取文章列表和基本信息,不下载完整内容。

# discovery_phase.py
import requests
import json
import os
import sys
from datetime import datetime
from urllib.parse import urljoin

def discover_posts(site_url, max_posts=50):
    """
    发现WordPress网站上的文章,只获取元数据
    返回文章列表,不下载完整内容
    """
    print(f"🔍 正在发现文章...")
    print(f"   网站: {site_url}")
    print(f"   最大文章数: {max_posts}")
    
    posts_list = []
    page = 1
    per_page = min(20, max_posts)
    
    while len(posts_list) < max_posts:
        # 构建API URL
        api_url = f"{site_url}/wp-json/wp/v2/posts"
        params = {
            'page': page,
            'per_page': per_page,
            '_fields': 'id,title,date,modified,slug,link,excerpt'
        }
        
        try:
            response = requests.get(api_url, params=params, timeout=30)
            
            if response.status_code != 200:
                print(f"❌ API请求失败: {response.status_code}")
                break
            
            posts = response.json()
            
            if not posts:
                print("✅ 已获取所有可用文章")
                break
            
            for post in posts:
                post_info = {
                    'id': post.get('id'),
                    'title': clean_html(post.get('title', {}).get('rendered', '无标题')),
                    'date': post.get('date'),
                    'modified': post.get('modified'),
                    'slug': post.get('slug'),
                    'link': post.get('link'),
                    'excerpt': clean_html(post.get('excerpt', {}).get('rendered', ''))[:200] + '...',
                    'selected': False  # 用于后续选择
                }
                posts_list.append(post_info)
            
            print(f"   已发现 {len(posts_list)} 篇文章...")
            
            # 如果返回的文章数少于请求数,说明没有更多文章了
            if len(posts) < per_page:
                break
                
            page += 1
            
        except requests.exceptions.RequestException as e:
            print(f"❌ 请求错误: {e}")
            break
        except json.JSONDecodeError as e:
            print(f"❌ JSON解析错误: {e}")
            break
    
    return posts_list

def clean_html(text):
    """清理HTML标签"""
    import re
    if not text:
        return ""
    clean = re.compile('<.*?>')
    return re.sub(clean, '', text)

def save_discovery_results(posts_list, output_dir, site_url):  # 修复:添加 site_url 参数
    """保存发现结果"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    discovery_file = os.path.join(output_dir, f"discovery_results_{timestamp}.json")
    
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 保存完整发现结果
    discovery_data = {
        'discovery_time': datetime.now().isoformat(),
        'total_posts': len(posts_list),
        'site_url': site_url,  # 使用传入的 site_url 参数
        'posts': posts_list
    }
    
    with open(discovery_file, 'w', encoding='utf-8') as f:
        json.dump(discovery_data, f, ensure_ascii=False, indent=2)
    
    # 创建简化的选择文件(便于用户编辑)
    selection_template = {
        'site_url': site_url,  # 使用传入的 site_url 参数
        'discovery_file': discovery_file,
        'selected_posts': []  # 用户将在这里添加要提取的文章ID
    }
    
    selection_file = os.path.join(output_dir, f"selection_template_{timestamp}.json")
    with open(selection_file, 'w', encoding='utf-8') as f:
        json.dump(selection_template, f, ensure_ascii=False, indent=2)
    
    return discovery_file, selection_file

def display_posts_summary(posts_list):
    """显示文章摘要"""
    print(f"\n📋 发现结果摘要")
    print("=" * 50)
    print(f"总文章数: {len(posts_list)}")
    print("\n前10篇文章:")
    print("-" * 50)
    
    for i, post in enumerate(posts_list[:10]):
        print(f"{i+1:2d}. ID: {post['id']} - {post['title']}")
        print(f"     日期: {post['date']} | 链接: {post['link']}")
        print(f"     摘要: {post['excerpt']}")
        print()
    
    if len(posts_list) > 10:
        print(f"... 还有 {len(posts_list) - 10} 篇文章")

def main():
    if len(sys.argv) < 2:
        print("使用方法: python discovery_phase.py <网站地址> [最大文章数]")
        print("示例: python discovery_phase.py https://example.com 100")
        sys.exit(1)
    
    site_url = sys.argv[1]
    max_posts = int(sys.argv[2]) if len(sys.argv) > 2 else 50
    
    print("=" * 50)
    print("🔍 WordPress文章发现工具")
    print("=" * 50)
    
    # 发现文章
    posts_list = discover_posts(site_url, max_posts)
    
    if not posts_list:
        print("❌ 没有发现任何文章")
        sys.exit(1)
    
    # 显示摘要
    display_posts_summary(posts_list)
    
    # 保存结果 - 修复:传递 site_url 参数
    output_dir = "./discovery_results"
    discovery_file, selection_file = save_discovery_results(posts_list, output_dir, site_url)
    
    print(f"\n💾 发现结果已保存:")
    print(f"   完整发现数据: {discovery_file}")
    print(f"   选择模板文件: {selection_file}")
    
    print(f"\n📝 下一步:")
    print(f"   1. 编辑 {selection_file}")
    print(f"   2. 在 'selected_posts' 数组中添加要提取的文章ID")
    print(f"   3. 运行: python selection_phase.py {selection_file}")

if __name__ == "__main__":
    main()

脚本2:文章选择脚本 selection_phase.py

这个脚本让用户基于发现结果选择要提取的文章

# selection_phase.py
import json
import os
import sys
from datetime import datetime

def load_discovery_results(discovery_file):
    """加载发现结果"""
    try:
        with open(discovery_file, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        print(f"❌ 加载发现结果失败: {e}")
        return None

def interactive_selection(discovery_data):
    """交互式选择文章"""
    posts = discovery_data['posts']
    
    print("=" * 50)
    print("📝 文章选择界面")
    print("=" * 50)
    print(f"总文章数: {len(posts)}")
    print("\n请选择要提取的文章:")
    print("   [A] 选择全部文章")
    print("   [R] 按范围选择")
    print("   [I] 按ID选择")
    print("   [L] 列出所有文章")
    print("   [Q] 完成选择")
    
    selected_ids = []
    
    while True:
        choice = input("\n请选择操作 (A/R/I/L/Q): ").strip().upper()
        
        if choice == 'A':
            selected_ids = [post['id'] for post in posts]
            print(f"✅ 已选择全部 {len(selected_ids)} 篇文章")
            break
            
        elif choice == 'R':
            try:
                start = int(input("起始序号 (1-{}): ".format(len(posts))))
                end = int(input("结束序号 (1-{}): ".format(len(posts))))
                if 1 <= start <= end <= len(posts):
                    selected_ids = [posts[i-1]['id'] for i in range(start, end+1)]
                    print(f"✅ 已选择文章 {start} 到 {end}")
                    break
                else:
                    print("❌ 范围无效")
            except ValueError:
                print("❌ 请输入有效数字")
                
        elif choice == 'I':
            try:
                ids_input = input("请输入文章ID(多个ID用逗号分隔): ")
                selected_ids = [int(id_str.strip()) for id_str in ids_input.split(',')]
                print(f"✅ 已选择 {len(selected_ids)} 篇文章")
                break
            except ValueError:
                print("❌ 请输入有效的数字ID")
                
        elif choice == 'L':
            print("\n📋 所有文章列表:")
            print("-" * 50)
            for i, post in enumerate(posts, 1):
                print(f"{i:3d}. ID: {post['id']} - {post['title']}")
                
        elif choice == 'Q':
            if selected_ids:
                break
            else:
                print("❌ 请先选择文章")
        else:
            print("❌ 无效选择")
    
    return selected_ids

def update_selection_file(selection_file, selected_ids, discovery_data):
    """更新选择文件"""
    try:
        with open(selection_file, 'r', encoding='utf-8') as f:
            selection_data = json.load(f)
        
        # 获取选中的文章详情
        selected_posts = []
        all_posts = {post['id']: post for post in discovery_data['posts']}
        
        for post_id in selected_ids:
            if post_id in all_posts:
                selected_posts.append(all_posts[post_id])
        
        selection_data['selected_posts'] = selected_posts
        selection_data['selection_time'] = datetime.now().isoformat()
        selection_data['total_selected'] = len(selected_posts)
        
        # 保存更新后的选择文件
        with open(selection_file, 'w', encoding='utf-8') as f:
            json.dump(selection_data, f, ensure_ascii=False, indent=2)
        
        return selection_data
        
    except Exception as e:
        print(f"❌ 更新选择文件失败: {e}")
        return None

def main():
    if len(sys.argv) != 2:
        print("使用方法: python selection_phase.py <选择模板文件>")
        print("示例: python selection_phase.py ./discovery_results/selection_template_20240520_143022.json")
        sys.exit(1)
    
    selection_file = sys.argv[1]
    
    if not os.path.exists(selection_file):
        print(f"❌ 选择文件不存在: {selection_file}")
        sys.exit(1)
    
    print("=" * 50)
    print("📝 WordPress文章选择工具")
    print("=" * 50)
    
    # 加载选择文件
    try:
        with open(selection_file, 'r', encoding='utf-8') as f:
            selection_data = json.load(f)
    except Exception as e:
        print(f"❌ 加载选择文件失败: {e}")
        sys.exit(1)
    
    discovery_file = selection_data.get('discovery_file')
    if not discovery_file or not os.path.exists(discovery_file):
        print(f"❌ 发现文件不存在: {discovery_file}")
        sys.exit(1)
    
    # 加载发现结果
    discovery_data = load_discovery_results(discovery_file)
    if not discovery_data:
        sys.exit(1)
    
    # 交互式选择
    selected_ids = interactive_selection(discovery_data)
    
    if not selected_ids:
        print("❌ 没有选择任何文章")
        sys.exit(1)
    
    # 更新选择文件
    updated_selection = update_selection_file(selection_file, selected_ids, discovery_data)
    
    if updated_selection:
        print(f"\n✅ 选择完成!")
        print(f"   已选择 {len(selected_ids)} 篇文章")
        print(f"   选择文件已更新: {selection_file}")
        print(f"\n🚀 下一步:")
        print(f"   运行: python extraction_phase.py {selection_file}")
    else:
        print("❌ 选择失败")

if __name__ == "__main__":
    main()

脚本3:内容提取脚本 extraction_phase.py

# extraction_phase.py
import requests
import json
import os
import sys
from datetime import datetime
from urllib.parse import urljoin
import time

def load_selection_data(selection_file):
    """加载选择数据"""
    try:
        with open(selection_file, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        print(f"❌ 加载选择数据失败: {e}")
        return None

def extract_post_content(site_url, post_id):
    """提取单篇文章的完整内容"""
    api_url = f"{site_url}/wp-json/wp/v2/posts/{post_id}"
    
    try:
        response = requests.get(api_url, timeout=30)
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"❌ 提取文章 {post_id} 失败: {response.status_code}")
            return None
            
    except requests.exceptions.RequestException as e:
        print(f"❌ 请求文章 {post_id} 失败: {e}")
        return None

def save_extracted_content(post_data, output_dir):
    """保存提取的内容"""
    post_id = post_data.get('id')
    post_slug = post_data.get('slug', f'post_{post_id}')
    
    # 创建文章目录
    post_dir = os.path.join(output_dir, f"post_{post_id}_{post_slug}")
    os.makedirs(post_dir, exist_ok=True)
    
    # 保存原始JSON数据
    raw_file = os.path.join(post_dir, "raw_data.json")
    with open(raw_file, 'w', encoding='utf-8') as f:
        json.dump(post_data, f, ensure_ascii=False, indent=2)
    
    # 提取并保存纯文本内容
    title = clean_html(post_data.get('title', {}).get('rendered', '无标题'))
    content = clean_html(post_data.get('content', {}).get('rendered', ''))
    excerpt = clean_html(post_data.get('excerpt', {}).get('rendered', ''))
    
    text_file = os.path.join(post_dir, "content.txt")
    with open(text_file, 'w', encoding='utf-8') as f:
        f.write(f"标题: {title}\n")
        f.write(f"日期: {post_data.get('date')}\n")
        f.write(f"修改时间: {post_data.get('modified')}\n")
        f.write(f"链接: {post_data.get('link')}\n")
        f.write(f"摘要: {excerpt}\n")
        f.write("\n" + "="*50 + "\n")
        f.write("正文内容:\n")
        f.write("="*50 + "\n\n")
        f.write(content)
    
    # 创建文章元数据
    meta = {
        'extraction_time': datetime.now().isoformat(),
        'post_id': post_id,
        'post_slug': post_slug,
        'title': title,
        'date': post_data.get('date'),
        'link': post_data.get('link'),
        'content_length': len(content),
        'word_count': len(content.split())
    }
    
    meta_file = os.path.join(post_dir, "metadata.json")
    with open(meta_file, 'w', encoding='utf-8') as f:
        json.dump(meta, f, ensure_ascii=False, indent=2)
    
    return {
        'post_id': post_id,
        'title': title,
        'content_length': len(content),
        'word_count': len(content.split()),
        'output_dir': post_dir
    }

def clean_html(text):
    """清理HTML标签"""
    import re
    if not text:
        return ""
    clean = re.compile('<.*?>')
    text = re.sub(clean, '', text)
    # 清理多余的空白字符
    text = re.sub(r'\n\s*\n', '\n\n', text)
    text = re.sub(r'[ \t]+', ' ', text)
    return text.strip()

def create_extraction_summary(extraction_results, selection_data, output_dir):
    """创建提取摘要"""
    summary = {
        'extraction_time': datetime.now().isoformat(),
        'site_url': selection_data.get('site_url'),
        'total_selected': selection_data.get('total_selected', 0),
        'successfully_extracted': len(extraction_results),
        'failed_extractions': selection_data.get('total_selected', 0) - len(extraction_results),
        'extraction_results': extraction_results,
        'output_directory': output_dir
    }
    
    summary_file = os.path.join(output_dir, "extraction_summary.json")
    with open(summary_file, 'w', encoding='utf-8') as f:
        json.dump(summary, f, ensure_ascii=False, indent=2)
    
    return summary_file

def main():
    if len(sys.argv) != 2:
        print("使用方法: python extraction_phase.py <选择文件>")
        print("示例: python extraction_phase.py ./discovery_results/selection_template_20240520_143022.json")
        sys.exit(1)
    
    selection_file = sys.argv[1]
    
    print("=" * 50)
    print("📥 WordPress内容提取工具")
    print("=" * 50)
    
    # 加载选择数据
    selection_data = load_selection_data(selection_file)
    if not selection_data:
        sys.exit(1)
    
    site_url = selection_data.get('site_url')
    selected_posts = selection_data.get('selected_posts', [])
    
    if not selected_posts:
        print("❌ 没有选择任何文章")
        sys.exit(1)
    
    print(f"🌐 网站: {site_url}")
    print(f"📝 准备提取 {len(selected_posts)} 篇文章")
    
    # 创建输出目录
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir = f"./extracted_content_{timestamp}"
    os.makedirs(output_dir, exist_ok=True)
    
    extraction_results = []
    failed_extractions = []
    
    # 逐篇提取文章
    for i, post_info in enumerate(selected_posts, 1):
        post_id = post_info['id']
        post_title = post_info['title']
        
        print(f"\n[{i}/{len(selected_posts)}] 提取文章: {post_title}")
        
        # 提取内容
        post_data = extract_post_content(site_url, post_id)
        
        if post_data:
            # 保存内容
            result = save_extracted_content(post_data, output_dir)
            extraction_results.append(result)
            print(f"   ✅ 提取成功: {result['word_count']} 字")
        else:
            failed_extractions.append(post_id)
            print(f"   ❌ 提取失败")
        
        # 添加短暂延迟,避免对服务器造成压力
        time.sleep(0.5)
    
    # 创建提取摘要
    summary_file = create_extraction_summary(extraction_results, selection_data, output_dir)
    
    # 显示结果摘要
    print(f"\n" + "=" * 50)
    print("📊 提取结果摘要")
    print("=" * 50)
    print(f"✅ 成功提取: {len(extraction_results)} 篇")
    print(f"❌ 提取失败: {len(failed_extractions)} 篇")
    print(f"📁 输出目录: {output_dir}")
    print(f"📄 摘要文件: {summary_file}")
    
    if failed_extractions:
        print(f"\n失败的文章ID: {failed_extractions}")
    
    print(f"\n🎉 提取完成!")

if __name__ == "__main__":
    main()

 新的使用流程

第一步:发现文章

python discovery_phase.py https://你的网站地址 100

第二步:选择文章

python selection_phase.py ./discovery_results/selection_template_时间戳.json

第三步:提取内容

python extraction_phase.py ./discovery_results/selection_template_时间戳.json

文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇