第五十九课:结构体与组合数据 – 数据建模的艺术
本文最后更新于13 天前,其中的信息可能已经过时,如有错误请发送邮件到184874483@qq.com

第五十九课:结构体与组合数据 – 数据建模的艺术

前言:从离散数据到结构化思维

在软件开发中,数据如同建筑中的砖石,如何组织和管理数据直接决定了程序的质量和可维护性。从简单的变量到复杂的结构体,从面向对象到函数式编程,数据组织方式一直在不断演进。

结构体(Structs)组合数据(Composite Data) 是数据建模的核心概念,它们允许我们将相关的数据项组合在一起,形成有意义的整体。从C语言的结构体到Rust的struct,从Python的数据类到TypeScript的接口,这些技术让我们的代码更加清晰、类型更加安全、数据结构更加合理。

今天,我们将深入探索结构体和组合数据的世界,学习如何在不同的编程范式中有效地组织和操作复杂数据。

第一部分:结构体的基础概念

1.1 结构体的本质:从内存布局到语义组合

结构体的本质是将多个相关的数据项组合成一个逻辑单元,这个单元在内存中是连续存储的,在语义上是完整的。

# ============================================================================
# 结构体的本质:从内存布局到语义组合
# ============================================================================

print("=== 结构体的本质:从内存布局到语义组合 ===")

def demonstrate_struct_essence():
    """演示结构体的本质"""

    print("结构体解决的五个核心问题:")
    print("1. 数据组织问题:将相关数据组合在一起")
    print("2. 内存布局问题:优化内存访问模式")
    print("3. 类型安全:明确数据的类型和结构")
    print("4. 抽象层次:提高代码的抽象层次")
    print("5. 数据完整性:维护相关数据的完整性")

    # 结构体在不同语言中的发展
    print("\n结构体在不同语言中的发展历程:")

    timeline = [
        ("1972", "C语言", "struct关键字", "内存连续布局的复合类型"),
        ("1979", "C++", "struct扩展为类", "支持成员函数、继承等"),
        ("1995", "Java", "只有类,没有struct", "一切皆对象"),
        ("1996", "Python 1.0", "字典和类", "动态结构,运行时定义"),
        ("2000", "C#", "struct值类型", "轻量级数据容器"),
        ("2010", "Go", "struct值类型", "组合优于继承"),
        ("2015", "Rust", "struct + trait", "内存安全的所有权系统"),
        ("2017", "Python 3.7", "dataclass", "自动生成方法的类装饰器"),
    ]

    for year, language, feature, description in timeline:
        print(f"  {year}: {language:10} | {feature:25} | {description}")

    # 不同语言的结构体实现对比
    print("\n不同语言的结构体实现:")

    implementations = {
        "C语言": {
            "特点": "内存布局紧凑,手动内存管理",
            "示例": """
struct Point {
    int x;
    int y;
};

struct Rectangle {
    struct Point top_left;
    struct Point bottom_right;
    int area;
};

// 使用
struct Point p = {10, 20};
struct Rectangle rect = {{0, 0}, {10, 10}, 100};
printf("Point: (%d, %d)\\n", p.x, p.y);
""",
            "内存布局": """
+-----------------+
| Point (8字节)   |
|   x: 4字节      |
|   y: 4字节      |
+-----------------+
| Rectangle (20字节)|
|   top_left      |
|     x: 4字节    |
|     y: 4字节    |
|   bottom_right  |
|     x: 4字节    |
|     y: 4字节    |
|   area: 4字节   |
+-----------------+
"""
        },
        "C++": {
            "特点": "可包含成员函数,支持访问控制",
            "示例": """
struct Point {
    int x;
    int y;

    // 构造函数
    Point(int x, int y) : x(x), y(y) {}

    // 成员函数
    void move(int dx, int dy) {
        x += dx;
        y += dy;
    }

    // 运算符重载
    Point operator+(const Point& other) const {
        return Point(x + other.x, y + other.y);
    }
};

// 使用
Point p1(10, 20);
Point p2(30, 40);
Point p3 = p1 + p2;
p3.move(5, 5);
""",
            "内存布局": "与C类似,但可能有虚函数表开销"
        },
        "Go": {
            "特点": "值类型,支持方法,组合优于继承",
            "示例": """
type Point struct {
    X int
    Y int
}

// 方法
func (p Point) Distance() float64 {
    return math.Sqrt(float64(p.X*p.X + p.Y*p.Y))
}

// 嵌入结构体(组合)
type Circle struct {
    Point  // 嵌入Point
    Radius int
}

// 使用
p := Point{X: 10, Y: 20}
distance := p.Distance()

c := Circle{
    Point:  Point{X: 5, Y: 5},
    Radius: 10,
}
// 可以直接访问嵌入字段
centerX := c.X
""",
            "内存布局": """
+-----------------+
| Point (16字节)  |
|   X: 8字节      |  // Go中int是平台相关的
|   Y: 8字节      |
+-----------------+
| Circle (24字节) |
|   Point (16字节)|
|   Radius: 8字节 |
+-----------------+
"""
        },
        "Rust": {
            "特点": "所有权系统,零成本抽象",
            "示例": """
// 定义结构体
struct Point {
    x: i32,
    y: i32,
}

// 实现方法
impl Point {
    // 关联函数(类似静态方法)
    fn new(x: i32, y: i32) -> Self {
        Point { x, y }
    }

    // 实例方法
    fn distance(&self) -> f64 {
        ((self.x.pow(2) + self.y.pow(2)) as f64).sqrt()
    }

    // 可变方法
    fn move_by(&mut self, dx: i32, dy: i32) {
        self.x += dx;
        self.y += dy;
    }
}

// 使用
let mut p = Point::new(10, 20);
println!("Distance: {}", p.distance());
p.move_by(5, 5);

// 元组结构体
struct Color(u8, u8, u8);
let red = Color(255, 0, 0);
""",
            "内存布局": """
+-----------------+
| Point (8字节)   |
|   x: 4字节      |
|   y: 4字节      |
+-----------------+
// 内存布局与C完全相同
"""
        },
        "Python": {
            "特点": "动态类型,多种实现方式",
            "示例": """
# 1. 使用普通类
class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def distance(self):
        return (self.x**2 + self.y**2)**0.5

# 2. 使用namedtuple
from collections import namedtuple
Point2 = namedtuple('Point2', ['x', 'y'])

# 3. 使用dataclass(Python 3.7+)
from dataclasses import dataclass
@dataclass
class Point3:
    x: float
    y: float

    def distance(self):
        return (self.x**2 + self.y**2)**0.5

# 使用
p1 = Point(10, 20)
p2 = Point2(10, 20)  # 不可变
p3 = Point3(10.0, 20.0)
""",
            "内存布局": "动态,每个对象都是字典的包装"
        },
        "TypeScript": {
            "特点": "类型标注,编译时检查",
            "示例": """
// 接口定义结构
interface Point {
    x: number;
    y: number;
}

// 类型别名
type Rectangle = {
    topLeft: Point;
    bottomRight: Point;
    area: number;
};

// 类实现
class PointClass implements Point {
    constructor(public x: number, public y: number) {}

    distance(): number {
        return Math.sqrt(this.x**2 + this.y**2);
    }
}

// 使用
const p: Point = { x: 10, y: 20 };
const rect: Rectangle = {
    topLeft: { x: 0, y: 0 },
    bottomRight: { x: 10, y: 10 },
    area: 100
};

const pc = new PointClass(10, 20);
console.log(pc.distance());
""",
            "内存布局": "JavaScript对象,运行时动态"
        }
    }

    for lang, info in implementations.items():
        print(f"\n{lang}:")
        print(f"  特点: {info['特点']}")
        print(f"  示例: {info['示例'].strip()}")
        if '内存布局' in info:
            print(f"  内存布局: {info['内存布局'].strip()}")

    # 结构体的四个核心价值
    print("\n结构体的四个核心价值:")

    values = [
        ("数据完整性", "相关数据项组织在一起", "保证数据的完整性和一致性"),
        ("代码可读性", "有意义的类型名称和字段名", "提高代码的可读性和可维护性"),
        ("内存效率", "连续内存布局", "提高缓存利用率和访问速度"),
        ("类型安全", "编译时类型检查", "减少运行时错误"),
    ]

    for value, mechanism, benefit in values:
        print(f"  • {value:15} | {mechanism:25} | -> {benefit}")

# 运行演示
demonstrate_struct_essence()

1.2 Python中的结构体实现方式

Python作为动态类型语言,提供了多种方式来实现结构体的概念。

# ============================================================================
# Python中的结构体实现方式
# ============================================================================

print("\n=== Python中的结构体实现方式 ===")

def demonstrate_python_structs():
    """演示Python中实现结构体的多种方式"""

    import sys
    from collections import namedtuple
    from dataclasses import dataclass, field, asdict, astuple
    from typing import List, Optional, ClassVar
    import json

    # 1. 使用普通类
    print("1. 使用普通类:")

    class PointClassic:
        """传统的类实现"""
        def __init__(self, x: float, y: float):
            self.x = x
            self.y = y

        def distance(self) -> float:
            """计算到原点的距离"""
            return (self.x**2 + self.y**2)**0.5

        def __repr__(self):
            return f"PointClassic(x={self.x}, y={self.y})"

    print("传统类的优缺点:")
    print("  优点: 完全控制,可添加任意方法")
    print("  缺点: 需要手动实现__repr__, __eq__等方法")
    print("  大小: 每个实例都有一个__dict__,内存开销大")

    p1 = PointClassic(3.0, 4.0)
    p2 = PointClassic(3.0, 4.0)
    print(f"  示例: {p1}")
    print(f"  距离: {p1.distance()}")
    print(f"  相等比较: {p1 == p2}")  # False,因为没有实现__eq__
    print(f"  内存大小: {sys.getsizeof(p1)} 字节 + __dict__开销")

    # 2. 使用__slots__优化内存
    print("\n2. 使用__slots__优化内存:")

    class PointSlots:
        """使用__slots__减少内存占用"""
        __slots__ = ('x', 'y')  # 固定属性列表

        def __init__(self, x: float, y: float):
            self.x = x
            self.y = y

        def distance(self) -> float:
            return (self.x**2 + self.y**2)**0.5

        def __repr__(self):
            return f"PointSlots(x={self.x}, y={self.y})"

    print("__slots__的优缺点:")
    print("  优点: 内存效率高,访问速度快")
    print("  缺点: 不能动态添加属性,继承时需要小心")

    p_slots = PointSlots(3.0, 4.0)
    print(f"  示例: {p_slots}")
    print(f"  内存大小: {sys.getsizeof(p_slots)} 字节")
    print(f"  内存节省: 约 {sys.getsizeof(p1) - sys.getsizeof(p_slots)} 字节")

    # 3. 使用namedtuple
    print("\n3. 使用namedtuple:")

    # 创建命名元组
    PointNamedTuple = namedtuple('PointNamedTuple', ['x', 'y'])

    # 也可以添加方法
    class PointNamedTupleWithMethods(PointNamedTuple):
        """带方法的命名元组"""
        def distance(self):
            return (self.x**2 + self.y**2)**0.5

        def move(self, dx: float, dy: float):
            return PointNamedTupleWithMethods(self.x + dx, self.y + dy)

    print("namedtuple的优缺点:")
    print("  优点: 不可变,线程安全,自带__repr__, __eq__等")
    print("  缺点: 不可变,不能添加新字段")

    p_nt = PointNamedTuple(3.0, 4.0)
    p_nt2 = PointNamedTupleWithMethods(3.0, 4.0)
    print(f"  示例: {p_nt}")
    print(f"  字段访问: x={p_nt.x}, y={p_nt.y}")
    print(f"  索引访问: [0]={p_nt[0]}, [1]={p_nt[1]}")
    print(f"  解包: x, y = p_nt -> {p_nt}")
    print(f"  带方法的版本: {p_nt2.distance()}")
    print(f"  不可变性测试: 不能修改 p_nt.x = 5.0")
    print(f"  内存大小: {sys.getsizeof(p_nt)} 字节")

    # 4. 使用dataclass(Python 3.7+)
    print("\n4. 使用dataclass(推荐方式):")

    @dataclass
    class PointDataClass:
        """使用dataclass的数据类"""
        x: float
        y: float

        def distance(self) -> float:
            return (self.x**2 + self.y**2)**0.5

    @dataclass(order=True)  # 生成比较方法
    class PointWithOrder:
        """可排序的dataclass"""
        x: float
        y: float

        def __post_init__(self):
            """初始化后处理"""
            if self.x < 0 or self.y < 0:
                print(f"警告: 坐标({self.x}, {self.y})包含负数")

    @dataclass(frozen=True)  # 不可变
    class ImmutablePoint:
        """不可变的点"""
        x: float
        y: float

    print("dataclass的优缺点:")
    print("  优点: 自动生成__init__, __repr__, __eq__等,类型提示")
    print("  缺点: Python 3.7+,某些高级特性需要手动处理")

    p_dc = PointDataClass(3.0, 4.0)
    p_dc2 = PointDataClass(3.0, 4.0)
    print(f"  示例: {p_dc}")
    print(f"  自动相等比较: {p_dc == p_dc2}")
    print(f"  距离计算: {p_dc.distance()}")
    print(f"  内存大小: {sys.getsizeof(p_dc)} 字节")

    # 排序示例
    points = [
        PointWithOrder(3.0, 4.0),
        PointWithOrder(1.0, 2.0),
        PointWithOrder(5.0, 0.0),
    ]
    print(f"  排序前: {points}")
    print(f"  排序后: {sorted(points)}")

    # 不可变示例
    p_immutable = ImmutablePoint(3.0, 4.0)
    print(f"  不可变点: {p_immutable}")
    try:
        p_immutable.x = 5.0  # 应该失败
    except Exception as e:
        print(f"  修改不可变点失败: {type(e).__name__}")

    # 5. 使用Pydantic(数据验证)
    print("\n5. 使用Pydantic(数据验证):")

    try:
        from pydantic import BaseModel, Field, validator

        class PointPydantic(BaseModel):
            """使用Pydantic的模型"""
            x: float = Field(..., description="X坐标", ge=0)  # 必须大于等于0
            y: float = Field(..., description="Y坐标", ge=0)

            @validator('x', 'y')
            def validate_coordinates(cls, v):
                if v > 1000:
                    raise ValueError('坐标值太大')
                return v

            def distance(self) -> float:
                return (self.x**2 + self.y**2)**0.5

            class Config:
                """配置"""
                frozen = True  # 不可变

        print("Pydantic的优缺点:")
        print("  优点: 强大的数据验证,JSON序列化,文档生成")
        print("  缺点: 需要额外依赖,有一定的性能开销")

        # 有效数据
        p_pydantic = PointPydantic(x=3.0, y=4.0)
        print(f"  有效示例: {p_pydantic}")
        print(f"  JSON序列化: {p_pydantic.json()}")
        print(f"  字典表示: {p_pydantic.dict()}")
        print(f"  模式: {p_pydantic.schema_json(indent=2)}")

        # 无效数据
        try:
            p_invalid = PointPydantic(x=-1.0, y=4.0)
        except Exception as e:
            print(f"  无效数据验证: {e}")

    except ImportError:
        print("  Pydantic未安装,跳过示例")
        print("  安装: pip install pydantic")

    # 6. 使用attrs库
    print("\n6. 使用attrs库:")

    try:
        import attr

        @attr.s(auto_attribs=True)
        class PointAttrs:
            """使用attrs的类"""
            x: float
            y: float

            def distance(self) -> float:
                return (self.x**2 + self.y**2)**0.5

        print("attrs的优缺点:")
        print("  优点: 功能强大,性能好,支持Python 3.6+")
        print("  缺点: 需要额外依赖")

        p_attrs = PointAttrs(3.0, 4.0)
        p_attrs2 = PointAttrs(3.0, 4.0)
        print(f"  示例: {p_attrs}")
        print(f"  自动相等比较: {p_attrs == p_attrs2}")
        print(f"  哈希支持: {hash(p_attrs)}")
        print(f"  属性: {attr.fields(PointAttrs)}")

    except ImportError:
        print("  attrs未安装,跳过示例")
        print("  安装: pip install attrs")

    # 7. 性能对比
    print("\n7. 各种实现方式的性能对比:")

    def performance_test():
        """性能测试"""
        import time

        # 创建100万个点
        n = 100000

        # 传统类
        start = time.time()
        points_classic = [PointClassic(i, i) for i in range(n)]
        classic_time = time.time() - start

        # __slots__
        start = time.time()
        points_slots = [PointSlots(i, i) for i in range(n)]
        slots_time = time.time() - start

        # namedtuple
        start = time.time()
        points_nt = [PointNamedTuple(i, i) for i in range(n)]
        nt_time = time.time() - start

        # dataclass
        start = time.time()
        points_dc = [PointDataClass(i, i) for i in range(n)]
        dc_time = time.time() - start

        print(f"  创建 {n:,} 个对象的时间:")
        print(f"    传统类: {classic_time:.4f} 秒")
        print(f"    __slots__: {slots_time:.4f} 秒 (节省 {classic_time-slots_time:.4f} 秒)")
        print(f"    namedtuple: {nt_time:.4f} 秒")
        print(f"    dataclass: {dc_time:.4f} 秒")

        # 内存使用
        if n <= 10000:  # 避免内存过大
            classic_mem = sum(sys.getsizeof(p) for p in points_classic[:1000])
            slots_mem = sum(sys.getsizeof(p) for p in points_slots[:1000])
            nt_mem = sum(sys.getsizeof(p) for p in points_nt[:1000])
            dc_mem = sum(sys.getsizeof(p) for p in points_dc[:1000])

            print(f"\n  1000个对象的内存使用:")
            print(f"    传统类: {classic_mem:,} 字节")
            print(f"    __slots__: {slots_mem:,} 字节 (节省 {classic_mem-slots_mem:,} 字节)")
            print(f"    namedtuple: {nt_mem:,} 字节")
            print(f"    dataclass: {dc_mem:,} 字节")

    # 运行性能测试
    performance_test()

    # 8. 实际应用示例
    print("\n8. 实际应用示例:几何图形系统")

    @dataclass
    class Point:
        """点"""
        x: float
        y: float

        def distance_to(self, other: 'Point') -> float:
            """计算到另一个点的距离"""
            return ((self.x - other.x)**2 + (self.y - other.y)**2)**0.5

        def translate(self, dx: float, dy: float) -> 'Point':
            """平移点"""
            return Point(self.x + dx, self.y + dy)

    @dataclass
    class Line:
        """线段"""
        start: Point
        end: Point

        def length(self) -> float:
            """线段长度"""
            return self.start.distance_to(self.end)

        def midpoint(self) -> Point:
            """中点"""
            return Point(
                (self.start.x + self.end.x) / 2,
                (self.start.y + self.end.y) / 2
            )

    @dataclass
    class Circle:
        """圆"""
        center: Point
        radius: float

        def area(self) -> float:
            """面积"""
            import math
            return math.pi * self.radius ** 2

        def circumference(self) -> float:
            """周长"""
            import math
            return 2 * math.pi * self.radius

        def contains(self, point: Point) -> bool:
            """判断点是否在圆内"""
            return self.center.distance_to(point) <= self.radius

    @dataclass
    class Rectangle:
        """矩形"""
        top_left: Point
        width: float
        height: float

        def area(self) -> float:
            """面积"""
            return self.width * self.height

        def perimeter(self) -> float:
            """周长"""
            return 2 * (self.width + self.height)

        def bottom_right(self) -> Point:
            """右下角点"""
            return Point(
                self.top_left.x + self.width,
                self.top_left.y + self.height
            )

        def contains(self, point: Point) -> bool:
            """判断点是否在矩形内"""
            br = self.bottom_right()
            return (self.top_left.x <= point.x <= br.x and 
                    self.top_left.y <= point.y <= br.y)

    print("几何图形系统示例:")

    # 创建点
    p1 = Point(0, 0)
    p2 = Point(3, 4)
    print(f"点: {p1}, {p2}")
    print(f"两点距离: {p1.distance_to(p2):.2f}")

    # 创建线段
    line = Line(p1, p2)
    print(f"\n线段: {line}")
    print(f"线段长度: {line.length():.2f}")
    print(f"线段中点: {line.midpoint()}")

    # 创建圆
    circle = Circle(Point(0, 0), 5)
    print(f"\n圆: {circle}")
    print(f"圆面积: {circle.area():.2f}")
    print(f"圆周长: {circle.circumference():.2f}")
    print(f"点{Point(3, 4)}在圆内: {circle.contains(Point(3, 4))}")
    print(f"点{Point(6, 0)}在圆内: {circle.contains(Point(6, 0))}")

    # 创建矩形
    rect = Rectangle(Point(0, 0), 10, 5)
    print(f"\n矩形: {rect}")
    print(f"矩形面积: {rect.area():.2f}")
    print(f"矩形周长: {rect.perimeter():.2f}")
    print(f"矩形右下角: {rect.bottom_right()}")
    print(f"点{Point(5, 2)}在矩形内: {rect.contains(Point(5, 2))}")
    print(f"点{Point(11, 2)}在矩形内: {rect.contains(Point(11, 2))}")

# 运行演示
demonstrate_python_structs()

第二部分:组合数据的高级模式

2.1 数据的嵌套与组合

现实世界中的数据往往是层次化的,我们需要通过嵌套和组合来构建复杂的数据结构。

# ============================================================================
# 数据的嵌套与组合
# ============================================================================

print("\n=== 数据的嵌套与组合 ===")

def demonstrate_data_composition():
    """演示数据的嵌套与组合"""

    from dataclasses import dataclass, field, asdict
    from typing import List, Dict, Optional, Set, Tuple
    import json
    from datetime import datetime

    # 1. 基本嵌套:地址系统
    print("1. 基本嵌套:地址系统")

    @dataclass
    class Address:
        """地址"""
        street: str
        city: str
        state: str
        zip_code: str
        country: str = "USA"  # 默认值

        def formatted(self) -> str:
            """格式化地址"""
            return f"{self.street}\n{self.city}, {self.state} {self.zip_code}\n{self.country}"

    @dataclass
    class Person:
        """人员"""
        name: str
        age: int
        address: Address  # 嵌套Address对象
        email: str
        phone: str = ""  # 可选字段

        def contact_info(self) -> str:
            """联系信息"""
            info = f"姓名: {self.name}\n"
            info += f"邮箱: {self.email}\n"
            if self.phone:
                info += f"电话: {self.phone}\n"
            info += f"地址:\n{self.address.formatted()}"
            return info

    print("基本嵌套示例:")
    address = Address(
        street="123 Main St",
        city="New York",
        state="NY",
        zip_code="10001"
    )

    person = Person(
        name="Alice Smith",
        age=30,
        address=address,
        email="alice@example.com",
        phone="+1-555-1234"
    )

    print(person.contact_info())
    print(f"\nJSON表示:\n{json.dumps(asdict(person), indent=2, default=str)}")

    # 2. 列表嵌套:订单系统
    print("\n2. 列表嵌套:订单系统")

    @dataclass
    class Product:
        """产品"""
        id: int
        name: str
        price: float
        description: str = ""
        category: str = "general"

        def display_price(self) -> str:
            """显示价格"""
            return f"${self.price:.2f}"

    @dataclass
    class OrderItem:
        """订单项"""
        product: Product
        quantity: int

        def total_price(self) -> float:
            """单项总价"""
            return self.product.price * self.quantity

        def __str__(self) -> str:
            return f"{self.product.name} x{self.quantity}: ${self.total_price():.2f}"

    @dataclass
    class Order:
        """订单"""
        order_id: str
        customer: Person
        items: List[OrderItem]  # OrderItem列表
        order_date: datetime = field(default_factory=datetime.now)
        status: str = "pending"

        def total_amount(self) -> float:
            """订单总金额"""
            return sum(item.total_price() for item in self.items)

        def item_count(self) -> int:
            """商品总数"""
            return sum(item.quantity for item in self.items)

        def add_item(self, product: Product, quantity: int = 1):
            """添加商品"""
            self.items.append(OrderItem(product, quantity))

        def __str__(self) -> str:
            lines = [
                f"订单号: {self.order_id}",
                f"客户: {self.customer.name}",
                f"日期: {self.order_date.strftime('%Y-%m-%d %H:%M')}",
                f"状态: {self.status}",
                "商品清单:",
            ]

            for i, item in enumerate(self.items, 1):
                lines.append(f"  {i}. {item}")

            lines.append(f"总计: ${self.total_amount():.2f} ({self.item_count()}件商品)")
            return "\n".join(lines)

    print("列表嵌套示例:")

    # 创建产品
    products = [
        Product(1, "Laptop", 999.99, "高性能笔记本", "electronics"),
        Product(2, "Mouse", 29.99, "无线鼠标", "electronics"),
        Product(3, "Notebook", 9.99, "纸质笔记本", "office"),
        Product(4, "Coffee Mug", 15.99, "咖啡杯", "kitchen"),
    ]

    # 创建订单
    order = Order(
        order_id="ORD-2024-001",
        customer=person,
        items=[
            OrderItem(products[0], 1),
            OrderItem(products[1], 2),
            OrderItem(products[3], 4),
        ]
    )

    print(order)

    # 添加更多商品
    order.add_item(products[2], 5)
    print(f"\n添加商品后:")
    print(f"  商品总数: {order.item_count()}件")
    print(f"  订单总额: ${order.total_amount():.2f}")

    # 3. 字典嵌套:库存系统
    print("\n3. 字典嵌套:库存系统")

    @dataclass
    class InventoryItem:
        """库存项"""
        product: Product
        quantity: int
        reorder_point: int = 10  # 补货点
        location: str = "Warehouse A"

        def needs_reorder(self) -> bool:
            """是否需要补货"""
            return self.quantity <= self.reorder_point

        def value(self) -> float:
            """库存价值"""
            return self.product.price * self.quantity

    @dataclass
    class Warehouse:
        """仓库"""
        name: str
        location: str
        inventory: Dict[int, InventoryItem] = field(default_factory=dict)  # 产品ID到库存项的映射

        def add_product(self, product: Product, quantity: int, location: str = None):
            """添加产品到库存"""
            if product.id in self.inventory:
                # 更新现有库存
                self.inventory[product.id].quantity += quantity
            else:
                # 创建新库存项
                item_location = location or self.location
                self.inventory[product.id] = InventoryItem(
                    product=product,
                    quantity=quantity,
                    location=item_location
                )

        def remove_product(self, product_id: int, quantity: int) -> bool:
            """从库存移除产品"""
            if product_id not in self.inventory:
                return False

            item = self.inventory[product_id]
            if item.quantity < quantity:
                return False

            item.quantity -= quantity

            # 如果数量为0,移除该项
            if item.quantity == 0:
                del self.inventory[product_id]

            return True

        def total_value(self) -> float:
            """仓库总价值"""
            return sum(item.value() for item in self.inventory.values())

        def products_needing_reorder(self) -> List[InventoryItem]:
            """需要补货的产品"""
            return [item for item in self.inventory.values() if item.needs_reorder()]

        def __str__(self) -> str:
            lines = [
                f"仓库: {self.name}",
                f"位置: {self.location}",
                f"库存项数: {len(self.inventory)}",
                f"总价值: ${self.total_value():.2f}",
            ]

            if self.inventory:
                lines.append("\n库存详情:")
                for item in self.inventory.values():
                    status = "⚠️ 需要补货" if item.needs_reorder() else "✓ 库存充足"
                    lines.append(f"  {item.product.name}: {item.quantity}件 (位置: {item.location}) - {status}")

            return "\n".join(lines)

    print("字典嵌套示例:")

    # 创建仓库
    warehouse = Warehouse("主仓库", "New York")

    # 添加库存
    for product in products:
        warehouse.add_product(product, 50 if product.price > 50 else 200)

    # 模拟销售
    warehouse.remove_product(1, 45)  # 卖出45台笔记本
    warehouse.remove_product(2, 195)  # 卖出195个鼠标

    print(warehouse)

    # 需要补货的产品
    reorder_items = warehouse.products_needing_reorder()
    print(f"\n需要补货的产品 ({len(reorder_items)}种):")
    for item in reorder_items:
        print(f"  {item.product.name}: 剩余{item.quantity}件,补货点{item.reorder_point}")

    # 4. 复杂嵌套:学校管理系统
    print("\n4. 复杂嵌套:学校管理系统")

    @dataclass
    class Grade:
        """成绩"""
        subject: str
        score: float
        max_score: float = 100.0
        weight: float = 1.0

        def percentage(self) -> float:
            """百分比"""
            return (self.score / self.max_score) * 100

        def weighted_score(self) -> float:
            """加权分数"""
            return self.score * self.weight

    @dataclass
    class Student:
        """学生"""
        id: int
        name: str
        age: int
        grades: Dict[str, List[Grade]] = field(default_factory=dict)  # 科目到成绩列表的映射

        def add_grade(self, grade: Grade):
            """添加成绩"""
            if grade.subject not in self.grades:
                self.grades[grade.subject] = []
            self.grades[grade.subject].append(grade)

        def average_grade(self, subject: str = None) -> Optional[float]:
            """平均成绩"""
            if subject:
                # 特定科目的平均成绩
                if subject not in self.grades or not self.grades[subject]:
                    return None
                grades = self.grades[subject]
                total_weighted = sum(g.weighted_score() for g in grades)
                total_weight = sum(g.weight for g in grades)
                return total_weighted / total_weight if total_weight > 0 else 0.0
            else:
                # 所有科目的平均成绩
                all_grades = [grade for subj_grades in self.grades.values() for grade in subj_grades]
                if not all_grades:
                    return None
                total_weighted = sum(g.weighted_score() for g in all_grades)
                total_weight = sum(g.weight for g in all_grades)
                return total_weighted / total_weight if total_weight > 0 else 0.0

        def gpa(self, scale: float = 4.0) -> Optional[float]:
            """GPA(基于百分制)"""
            avg = self.average_grade()
            if avg is None:
                return None
            # 简单转换:90-100 -> 4.0, 80-89 -> 3.0, 等等
            if avg >= 90:
                return scale
            elif avg >= 80:
                return scale * 0.75
            elif avg >= 70:
                return scale * 0.5
            elif avg >= 60:
                return scale * 0.25
            else:
                return 0.0

        def __str__(self) -> str:
            lines = [
                f"学生: {self.name} (ID: {self.id}, 年龄: {self.age})",
            ]

            if self.grades:
                lines.append("成绩单:")
                for subject, grades in self.grades.items():
                    avg = self.average_grade(subject)
                    lines.append(f"  {subject}: {avg:.1f}%")
                    for grade in grades:
                        lines.append(f"    - {grade.score:.1f}/{grade.max_score} (权重: {grade.weight})")

                overall_avg = self.average_grade()
                gpa = self.gpa()
                if overall_avg is not None:
                    lines.append(f"平均成绩: {overall_avg:.1f}%")
                if gpa is not None:
                    lines.append(f"GPA: {gpa:.2f}/{4.0}")
            else:
                lines.append("暂无成绩")

            return "\n".join(lines)

    @dataclass
    class Course:
        """课程"""
        code: str
        name: str
        instructor: str
        students: List[Student] = field(default_factory=list)
        credits: int = 3

        def enroll(self, student: Student):
            """注册学生"""
            if student not in self.students:
                self.students.append(student)

        def average_score(self) -> Optional[float]:
            """课程平均分"""
            if not self.students:
                return None

            scores = []
            for student in self.students:
                avg = student.average_grade(self.name)
                if avg is not None:
                    scores.append(avg)

            return sum(scores) / len(scores) if scores else None

        def __str__(self) -> str:
            lines = [
                f"课程: {self.code} - {self.name}",
                f"教师: {self.instructor}",
                f"学分: {self.credits}",
                f"学生数: {len(self.students)}",
            ]

            avg_score = self.average_score()
            if avg_score is not None:
                lines.append(f"课程平均分: {avg_score:.1f}%")

            if self.students:
                lines.append("\n学生列表:")
                for student in self.students:
                    student_avg = student.average_grade(self.name)
                    status = f"{student_avg:.1f}%" if student_avg is not None else "无成绩"
                    lines.append(f"  {student.name}: {status}")

            return "\n".join(lines)

    print("复杂嵌套示例:学校管理系统")

    # 创建学生
    alice = Student(1, "Alice Johnson", 18)
    bob = Student(2, "Bob Smith", 19)
    charlie = Student(3, "Charlie Brown", 18)

    # 添加成绩
    alice.add_grade(Grade("Math", 95, weight=1.5))
    alice.add_grade(Grade("Math", 88))
    alice.add_grade(Grade("Science", 92))
    alice.add_grade(Grade("History", 85))

    bob.add_grade(Grade("Math", 78))
    bob.add_grade(Grade("Science", 91))
    bob.add_grade(Grade("English", 88))

    charlie.add_grade(Grade("Math", 65))
    charlie.add_grade(Grade("Science", 72))
    charlie.add_grade(Grade("Art", 95, weight=0.8))

    print("学生信息:")
    for student in [alice, bob, charlie]:
        print(f"\n{student}")

    # 创建课程
    math_course = Course("MATH101", "Calculus I", "Dr. Johnson")
    science_course = Course("SCI101", "Physics", "Dr. Wilson")

    # 注册学生
    math_course.enroll(alice)
    math_course.enroll(bob)
    math_course.enroll(charlie)

    science_course.enroll(alice)
    science_course.enroll(bob)
    science_course.enroll(charlie)

    print("\n课程信息:")
    print(f"\n{math_course}")
    print(f"\n{science_course}")

    # 5. 树形结构:组织结构图
    print("\n5. 树形结构:组织结构图")

    @dataclass
    class Employee:
        """员工"""
        id: int
        name: str
        position: str
        salary: float
        subordinates: List['Employee'] = field(default_factory=list)  # 递归引用

        def add_subordinate(self, employee: 'Employee'):
            """添加下属"""
            self.subordinates.append(employee)

        def total_subordinates(self) -> int:
            """总下属数(包括间接下属)"""
            total = len(self.subordinates)
            for subordinate in self.subordinates:
                total += subordinate.total_subordinates()
            return total

        def department_salary(self) -> float:
            """部门总薪资"""
            total = self.salary
            for subordinate in self.subordinates:
                total += subordinate.department_salary()
            return total

        def find_employee(self, employee_id: int) -> Optional['Employee']:
            """查找员工(递归搜索)"""
            if self.id == employee_id:
                return self

            for subordinate in self.subordinates:
                found = subordinate.find_employee(employee_id)
                if found:
                    return found

            return None

        def print_hierarchy(self, level: int = 0):
            """打印组织层级"""
            indent = "  " * level
            print(f"{indent}{self.name} ({self.position}) - ${self.salary:,.0f}")

            for subordinate in self.subordinates:
                subordinate.print_hierarchy(level + 1)

    # 创建组织层级
    ceo = Employee(1, "Alice CEO", "CEO", 500000)
    cto = Employee(2, "Bob CTO", "CTO", 300000)
    cfo = Employee(3, "Charlie CFO", "CFO", 280000)

    dev_manager = Employee(4, "Diana Dev", "Development Manager", 180000)
    qa_manager = Employee(5, "Eve QA", "QA Manager", 150000)

    senior_dev = Employee(6, "Frank Dev", "Senior Developer", 120000)
    junior_dev = Employee(7, "Grace Dev", "Junior Developer", 80000)
    qa_engineer = Employee(8, "Harry QA", "QA Engineer", 90000)

    # 构建层级关系
    ceo.add_subordinate(cto)
    ceo.add_subordinate(cfo)

    cto.add_subordinate(dev_manager)
    cto.add_subordinate(qa_manager)

    dev_manager.add_subordinate(senior_dev)
    dev_manager.add_subordinate(junior_dev)

    qa_manager.add_subordinate(qa_engineer)

    print("组织结构图:")
    ceo.print_hierarchy()

    print(f"\n组织统计:")
    print(f"CEO总下属数: {ceo.total_subordinates()}")
    print(f"部门总薪资: ${ceo.department_salary():,.0f}")
    print(f"平均薪资: ${ceo.department_salary() / (ceo.total_subordinates() + 1):,.0f}")

    # 查找员工
    found = ceo.find_employee(6)
    if found:
        print(f"\n查找到员工 ID 6: {found.name}")

# 运行演示
demonstrate_data_composition()

2.2 数据组合的设计模式

通过组合数据结构,我们可以实现各种有用的设计模式。

# ============================================================================
# 数据组合的设计模式
# ============================================================================

print("\n=== 数据组合的设计模式 ===")

def demonstrate_data_patterns():
    """演示数据组合的设计模式"""

    from dataclasses import dataclass, field, asdict
    from typing import List, Dict, Optional, Union, Any
    from abc import ABC, abstractmethod
    import json

    # 1. 组合模式(Composite Pattern)
    print("1. 组合模式(Composite Pattern):")

    @dataclass
    class FileSystemComponent(ABC):
        """文件系统组件抽象基类"""
        name: str

        @abstractmethod
        def size(self) -> int:
            """获取大小"""
            pass

        @abstractmethod
        def display(self, indent: int = 0) -> str:
            """显示结构"""
            pass

    @dataclass
    class File(FileSystemComponent):
        """文件"""
        content_size: int

        def size(self) -> int:
            return self.content_size

        def display(self, indent: int = 0) -> str:
            return "  " * indent + f"📄 {self.name} ({self.size()} bytes)"

    @dataclass
    class Directory(FileSystemComponent):
        """目录"""
        children: List[FileSystemComponent] = field(default_factory=list)

        def add(self, component: FileSystemComponent):
            """添加子组件"""
            self.children.append(component)

        def remove(self, component: FileSystemComponent):
            """移除子组件"""
            self.children.remove(component)

        def size(self) -> int:
            """目录大小(所有子组件的总和)"""
            return sum(child.size() for child in self.children)

        def display(self, indent: int = 0) -> str:
            lines = ["  " * indent + f"📁 {self.name} ({self.size()} bytes)"]
            for child in self.children:
                lines.append(child.display(indent + 1))
            return "\n".join(lines)

    print("组合模式示例:文件系统")

    # 创建文件系统结构
    root = Directory("root")

    home = Directory("home")
    user = Directory("user")
    documents = Directory("documents")
    downloads = Directory("downloads")

    # 创建文件
    file1 = File("notes.txt", 1500)
    file2 = File("report.pdf", 2500000)
    file3 = File("photo.jpg", 3500000)
    file4 = File("song.mp3", 8000000)

    # 构建层次结构
    root.add(home)
    home.add(user)
    user.add(documents)
    user.add(downloads)

    documents.add(file1)
    documents.add(file2)
    downloads.add(file3)
    downloads.add(file4)

    print(root.display())

    # 2. 访问者模式(Visitor Pattern)
    print("\n2. 访问者模式(Visitor Pattern):")

    class ShapeVisitor(ABC):
        """形状访问者接口"""
        @abstractmethod
        def visit_circle(self, circle: 'Circle'):
            pass

        @abstractmethod
        def visit_rectangle(self, rectangle: 'Rectangle'):
            pass

        @abstractmethod
        def visit_composite(self, composite: 'CompositeShape'):
            pass

    @dataclass
    class Shape(ABC):
        """形状基类"""
        @abstractmethod
        def accept(self, visitor: ShapeVisitor):
            """接受访问者"""
            pass

    @dataclass
    class Circle(Shape):
        """圆形"""
        radius: float

        def accept(self, visitor: ShapeVisitor):
            visitor.visit_circle(self)

        def area(self) -> float:
            import math
            return math.pi * self.radius ** 2

    @dataclass
    class Rectangle(Shape):
        """矩形"""
        width: float
        height: float

        def accept(self, visitor: ShapeVisitor):
            visitor.visit_rectangle(self)

        def area(self) -> float:
            return self.width * self.height

    @dataclass
    class CompositeShape(Shape):
        """复合形状"""
        shapes: List[Shape] = field(default_factory=list)

        def add(self, shape: Shape):
            self.shapes.append(shape)

        def accept(self, visitor: ShapeVisitor):
            visitor.visit_composite(self)
            for shape in self.shapes:
                shape.accept(visitor)

        def area(self) -> float:
            return sum(shape.area() for shape in self.shapes)

    # 具体访问者:面积计算器
    class AreaCalculator(ShapeVisitor):
        """面积计算器"""
        def __init__(self):
            self.total_area = 0.0

        def visit_circle(self, circle: Circle):
            self.total_area += circle.area()

        def visit_rectangle(self, rectangle: Rectangle):
            self.total_area += rectangle.area()

        def visit_composite(self, composite: CompositeShape):
            # 复合形状的面积会在遍历子形状时计算
            pass

    # 具体访问者:JSON导出器
    class JsonExporter(ShapeVisitor):
        """JSON导出器"""
        def __init__(self):
            self.data = []

        def visit_circle(self, circle: Circle):
            self.data.append({
                "type": "circle",
                "radius": circle.radius,
                "area": circle.area()
            })

        def visit_rectangle(self, rectangle: Rectangle):
            self.data.append({
                "type": "rectangle",
                "width": rectangle.width,
                "height": rectangle.height,
                "area": rectangle.area()
            })

        def visit_composite(self, composite: CompositeShape):
            # 复合形状本身不添加数据,只包含子形状
            pass

        def export(self) -> str:
            return json.dumps(self.data, indent=2)

    print("访问者模式示例:形状处理")

    # 创建形状
    circle = Circle(5.0)
    rectangle1 = Rectangle(4.0, 6.0)
    rectangle2 = Rectangle(3.0, 7.0)

    composite = CompositeShape()
    composite.add(circle)
    composite.add(rectangle1)
    composite.add(rectangle2)

    # 使用面积计算器
    area_calculator = AreaCalculator()
    composite.accept(area_calculator)
    print(f"总面积: {area_calculator.total_area:.2f}")

    # 使用JSON导出器
    json_exporter = JsonExporter()
    composite.accept(json_exporter)
    print(f"JSON导出:\n{json_exporter.export()}")

    # 3. 建造者模式(Builder Pattern)
    print("\n3. 建造者模式(Builder Pattern):")

    @dataclass
    class Computer:
        """计算机"""
        cpu: str
        memory_gb: int
        storage_gb: int
        gpu: Optional[str] = None
        operating_system: Optional[str] = None
        peripherals: List[str] = field(default_factory=list)

        def __str__(self):
            parts = [
                f"CPU: {self.cpu}",
                f"内存: {self.memory_gb}GB",
                f"存储: {self.storage_gb}GB",
            ]

            if self.gpu:
                parts.append(f"显卡: {self.gpu}")

            if self.operating_system:
                parts.append(f"操作系统: {self.operating_system}")

            if self.peripherals:
                parts.append(f"外设: {', '.join(self.peripherals)}")

            return "\n".join(parts)

    class ComputerBuilder:
        """计算机建造者"""
        def __init__(self):
            self.cpu = "Intel Core i5"
            self.memory_gb = 8
            self.storage_gb = 256
            self.gpu = None
            self.operating_system = None
            self.peripherals = []

        def set_cpu(self, cpu: str) -> 'ComputerBuilder':
            self.cpu = cpu
            return self

        def set_memory(self, memory_gb: int) -> 'ComputerBuilder':
            self.memory_gb = memory_gb
            return self

        def set_storage(self, storage_gb: int) -> 'ComputerBuilder':
            self.storage_gb = storage_gb
            return self

        def set_gpu(self, gpu: str) -> 'ComputerBuilder':
            self.gpu = gpu
            return self

        def set_operating_system(self, os: str) -> 'ComputerBuilder':
            self.operating_system = os
            return self

        def add_peripheral(self, peripheral: str) -> 'ComputerBuilder':
            self.peripherals.append(peripheral)
            return self

        def build(self) -> Computer:
            return Computer(
                cpu=self.cpu,
                memory_gb=self.memory_gb,
                storage_gb=self.storage_gb,
                gpu=self.gpu,
                operating_system=self.operating_system,
                peripherals=self.peripherals
            )

    # 导演类(可选)
    class ComputerDirector:
        """计算机导演"""
        @staticmethod
        def build_gaming_pc(builder: ComputerBuilder) -> Computer:
            return (builder
                   .set_cpu("Intel Core i9")
                   .set_memory(32)
                   .set_storage(2000)
                   .set_gpu("NVIDIA RTX 4090")
                   .set_operating_system("Windows 11")
                   .add_peripheral("Gaming Mouse")
                   .add_peripheral("Mechanical Keyboard")
                   .add_peripheral("Gaming Monitor")
                   .build())

        @staticmethod
        def build_office_pc(builder: ComputerBuilder) -> Computer:
            return (builder
                   .set_cpu("Intel Core i5")
                   .set_memory(16)
                   .set_storage(512)
                   .set_operating_system("Windows 10")
                   .add_peripheral("Basic Mouse")
                   .add_peripheral("Standard Keyboard")
                   .build())

        @staticmethod
        def build_developer_pc(builder: ComputerBuilder) -> Computer:
            return (builder
                   .set_cpu("AMD Ryzen 9")
                   .set_memory(64)
                   .set_storage(4000)
                   .set_gpu("NVIDIA RTX 3080")
                   .set_operating_system("Ubuntu 22.04")
                   .add_peripheral("Ergonomic Mouse")
                   .add_peripheral("Ergonomic Keyboard")
                   .add_peripheral("Dual Monitors")
                   .build())

    print("建造者模式示例:构建计算机")

    builder = ComputerBuilder()
    director = ComputerDirector()

    print("游戏电脑配置:")
    gaming_pc = director.build_gaming_pc(builder)
    print(gaming_pc)

    print("\n办公电脑配置:")
    office_pc = director.build_office_pc(ComputerBuilder())  # 新的建造者
    print(office_pc)

    print("\n开发电脑配置:")
    dev_pc = director.build_developer_pc(ComputerBuilder())  # 新的建造者
    print(dev_pc)

    # 自定义构建
    print("\n自定义配置:")
    custom_pc = (ComputerBuilder()
                .set_cpu("Apple M2")
                .set_memory(24)
                .set_storage(1000)
                .set_operating_system("macOS")
                .add_peripheral("Magic Mouse")
                .add_peripheral("Magic Keyboard")
                .build())
    print(custom_pc)

    # 4. 原型模式(Prototype Pattern)
    print("\n4. 原型模式(Prototype Pattern):")

    import copy

    @dataclass
    class DocumentTemplate:
        """文档模板(原型)"""
        title: str
        author: str
        content: str
        styles: Dict[str, Any]
        metadata: Dict[str, Any]

        def clone(self) -> 'DocumentTemplate':
            """深拷贝克隆"""
            return copy.deepcopy(self)

        def customize(self, **kwargs) -> 'DocumentTemplate':
            """定制化克隆"""
            clone = self.clone()
            for key, value in kwargs.items():
                if hasattr(clone, key):
                    setattr(clone, key, value)
                else:
                    clone.metadata[key] = value
            return clone

        def __str__(self):
            return (f"文档: {self.title}\n"
                   f"作者: {self.author}\n"
                   f"内容: {self.content[:50]}...\n"
                   f"样式: {self.styles}\n"
                   f"元数据: {self.metadata}")

    print("原型模式示例:文档模板")

    # 创建原型
    report_template = DocumentTemplate(
        title="月度报告",
        author="公司模板",
        content="# 月度报告\n\n## 概述\n\n...",
        styles={
            "font": "Arial",
            "size": 12,
            "color": "#000000"
        },
        metadata={
            "department": "所有部门",
            "confidential": False,
            "version": "1.0"
        }
    )

    print("原始模板:")
    print(report_template)

    # 克隆并定制
    print("\n克隆并定制销售部门报告:")
    sales_report = report_template.customize(
        title="销售部月度报告",
        author="销售总监",
        metadata={"department": "销售部", "confidential": True}
    )
    print(sales_report)

    print("\n克隆并定制技术部门报告:")
    tech_report = report_template.customize(
        title="技术部月度报告",
        author="技术总监",
        content="# 技术部月度报告\n\n## 项目进展\n\n...",
        metadata={"department": "技术部", "version": "2.0"}
    )
    print(tech_report)

    # 验证是独立对象
    print(f"\n验证独立性:")
    print(f"原始模板标题: {report_template.title}")
    print(f"销售报告标题: {sales_report.title}")
    print(f"技术报告标题: {tech_report.title}")
    print(f"sales_report is report_template? {sales_report is report_template}")

    # 5. 享元模式(Flyweight Pattern)
    print("\n5. 享元模式(Flyweight Pattern):")

    @dataclass
    class TreeType:
        """树类型(享元)"""
        name: str
        color: str
        texture: str

        def render(self, x: int, y: int, age: int):
            """渲染树"""
            return f"在({x}, {y})渲染{self.name}树,颜色{self.color},纹理{self.texture},树龄{age}年"

    class TreeTypeFactory:
        """树类型工厂(管理享元)"""
        _tree_types: Dict[str, TreeType] = {}

        @classmethod
        def get_tree_type(cls, name: str, color: str, texture: str) -> TreeType:
            """获取树类型,如果不存在则创建"""
            key = f"{name}_{color}_{texture}"

            if key not in cls._tree_types:
                print(f"创建新的树类型: {name}")
                cls._tree_types[key] = TreeType(name, color, texture)
            else:
                print(f"复用现有树类型: {name}")

            return cls._tree_types[key]

    @dataclass
    class Tree:
        """树(外部状态)"""
        x: int
        y: int
        age: int
        tree_type: TreeType  # 享元引用

        def render(self):
            """渲染树"""
            return self.tree_type.render(self.x, self.y, self.age)

    class Forest:
        """森林"""
        def __init__(self):
            self.trees: List[Tree] = []

        def plant_tree(self, x: int, y: int, age: int, name: str, color: str, texture: str):
            """种植树"""
            tree_type = TreeTypeFactory.get_tree_type(name, color, texture)
            tree = Tree(x, y, age, tree_type)
            self.trees.append(tree)

        def render(self):
            """渲染整个森林"""
            return [tree.render() for tree in self.trees]

        def count_tree_types(self) -> int:
            """统计树类型数量"""
            types = set(tree.tree_type for tree in self.trees)
            return len(types)

    print("享元模式示例:森林渲染")

    # 创建森林
    forest = Forest()

    # 种植树(许多树共享少数类型)
    print("\n种植树:")
    forest.plant_tree(10, 20, 5, "Oak", "Green", "Rough")
    forest.plant_tree(30, 40, 10, "Oak", "Green", "Rough")  # 复用
    forest.plant_tree(50, 60, 3, "Pine", "Dark Green", "Smooth")
    forest.plant_tree(70, 80, 7, "Oak", "Green", "Rough")  # 复用
    forest.plant_tree(90, 100, 15, "Maple", "Red", "Smooth")
    forest.plant_tree(110, 120, 8, "Pine", "Dark Green", "Smooth")  # 复用

    # 渲染森林
    print("\n渲染森林:")
    renderings = forest.render()
    for i, rendering in enumerate(renderings[:3]):  # 只显示前3棵
        print(f"  树{i+1}: {rendering}")

    print(f"\n森林统计:")
    print(f"  总树数: {len(forest.trees)}")
    print(f"  树类型数: {forest.count_tree_types()}")
    print(f"  内存节省: 每棵树类型对象被{len(forest.trees)//forest.count_tree_types():.1f}棵树共享")

    # 6. 装饰器模式(使用数据类)
    print("\n6. 装饰器模式(使用数据类):")

    @dataclass
    class Coffee:
        """咖啡基类"""
        description: str = "普通咖啡"

        def cost(self) -> float:
            return 2.0

        def __str__(self) -> str:
            return f"{self.description}: ${self.cost():.2f}"

    @dataclass
    class MilkDecorator:
        """牛奶装饰器"""
        coffee: Coffee

        @property
        def description(self) -> str:
            return self.coffee.description + " + 牛奶"

        def cost(self) -> float:
            return self.coffee.cost() + 0.5

        def __str__(self) -> str:
            return f"{self.description}: ${self.cost():.2f}"

    @dataclass
    class SugarDecorator:
        """糖装饰器"""
        coffee: Coffee

        @property
        def description(self) -> str:
            return self.coffee.description + " + 糖"

        def cost(self) -> float:
            return self.coffee.cost() + 0.2

        def __str__(self) -> str:
            return f"{self.description}: ${self.cost():.2f}"

    @dataclass
    class WhippedCreamDecorator:
        """奶油装饰器"""
        coffee: Coffee

        @property
        def description(self) -> str:
            return self.coffee.description + " + 奶油"

        def cost(self) -> float:
            return self.coffee.cost() + 0.7

        def __str__(self) -> str:
            return f"{self.description}: ${self.cost():.2f}"

    print("装饰器模式示例:咖啡订单")

    # 创建咖啡
    coffee = Coffee()
    print(f"基础咖啡: {coffee}")

    # 添加装饰
    coffee_with_milk = MilkDecorator(coffee)
    print(f"加牛奶: {coffee_with_milk}")

    coffee_with_milk_and_sugar = SugarDecorator(coffee_with_milk)
    print(f"加牛奶和糖: {coffee_with_milk_and_sugar}")

    fancy_coffee = WhippedCreamDecorator(
        SugarDecorator(
            MilkDecorator(coffee)
        )
    )
    print(f"豪华咖啡: {fancy_coffee}")

# 运行演示
demonstrate_data_patterns()

第三部分:函数式编程中的数据组合

3.1 不可变数据结构

函数式编程强调不可变性,这对于并发编程和数据一致性至关重要。

# ============================================================================
# 不可变数据结构
# ============================================================================

print("\n=== 不可变数据结构 ===")

def demonstrate_immutable_data():
    """演示不可变数据结构"""

    import sys
    from collections import namedtuple
    from typing import Tuple, List, Dict, FrozenSet
    import copy

    print("不可变数据结构的优势:")
    print("1. 线程安全:无需锁即可在多个线程间共享")
    print("2. 可预测性:状态不会意外改变")
    print("3. 易于测试:没有副作用")
    print("4. 易于推理:值在生命周期内不变")

    # 1. Python内置的不可变类型
    print("\n1. Python内置的不可变类型:")

    # 基本不可变类型
    immutable_types = [
        ("int", 42),
        ("float", 3.14),
        ("str", "hello"),
        ("tuple", (1, 2, 3)),
        ("frozenset", frozenset([1, 2, 3])),
        ("bytes", b"data"),
    ]

    for type_name, value in immutable_types:
        print(f"  {type_name:15}: {value}")

    # 2. 使用namedtuple创建不可变记录
    print("\n2. 使用namedtuple创建不可变记录:")

    from collections import namedtuple

    # 创建不可变的点
    Point = namedtuple('Point', ['x', 'y'])
    p1 = Point(10, 20)

    print(f"原始点: {p1}")
    print(f"访问字段: x={p1.x}, y={p1.y}")
    print(f"索引访问: p1[0]={p1[0]}, p1[1]={p1[1]}")
    print(f"解包: x, y = p1 -> x={p1.x}, y={p1.y}")

    # 尝试修改(应该失败)
    try:
        p1.x = 30  # 应该引发AttributeError
    except AttributeError as e:
        print(f"修改失败(预期中): {e}")

    # 创建新对象(函数式更新)
    p2 = p1._replace(x=30)
    print(f"创建新点: {p2}")
    print(f"p1保持不变: {p1}")
    print(f"p1 is p2? {p1 is p2}")

    # 3. 不可变列表(使用元组)
    print("\n3. 不可变列表(使用元组):")

    def immutable_append(lst: Tuple, item) -> Tuple:
        """不可变追加"""
        return lst + (item,)

    def immutable_extend(lst: Tuple, items: Tuple) -> Tuple:
        """不可变扩展"""
        return lst + items

    def immutable_insert(lst: Tuple, index: int, item) -> Tuple:
        """不可变插入"""
        return lst[:index] + (item,) + lst[index:]

    def immutable_remove(lst: Tuple, index: int) -> Tuple:
        """不可变删除"""
        return lst[:index] + lst[index+1:]

    # 初始列表
    numbers = (1, 2, 3)
    print(f"初始列表: {numbers}")

    # 函数式更新
    numbers2 = immutable_append(numbers, 4)
    print(f"追加后: {numbers2}")

    numbers3 = immutable_insert(numbers2, 1, 99)
    print(f"插入后: {numbers3}")

    numbers4 = immutable_remove(numbers3, 2)
    print(f"删除后: {numbers4}")

    print(f"所有版本共存: {numbers}, {numbers2}, {numbers3}, {numbers4}")

    # 4. 不可变字典(Python 3.3+ 的types.MappingProxyType)
    print("\n4. 不可变字典:")

    from types import MappingProxyType

    # 创建可变字典
    mutable_dict = {'a': 1, 'b': 2, 'c': 3}

    # 创建不可变视图
    immutable_dict = MappingProxyType(mutable_dict)

    print(f"原始字典: {mutable_dict}")
    print(f"不可变视图: {immutable_dict}")
    print(f"访问元素: immutable_dict['a'] = {immutable_dict['a']}")

    # 尝试修改不可变视图(应该失败)
    try:
        immutable_dict['d'] = 4
    except TypeError as e:
        print(f"修改不可变视图失败(预期中): {e}")

    # 修改原始字典(不可变视图会反映变化)
    mutable_dict['d'] = 4
    print(f"修改原始字典后,不可变视图: {immutable_dict}")

    # 5. 不可变树结构
    print("\n5. 不可变树结构:")

    class ImmutableTree:
        """不可变树"""

        __slots__ = ('value', 'left', 'right')  # 不可变,不需要__dict__

        def __init__(self, value, left=None, right=None):
            self.value = value
            self.left = left
            self.right = right

        def __repr__(self):
            left_repr = f" {self.left}" if self.left else ""
            right_repr = f" {self.right}" if self.right else ""
            return f"({self.value}{left_repr}{right_repr})"

        def insert(self, new_value):
            """插入新值,返回新树"""
            if new_value < self.value:
                new_left = self.left.insert(new_value) if self.left else ImmutableTree(new_value)
                return ImmutableTree(self.value, new_left, self.right)
            elif new_value > self.value:
                new_right = self.right.insert(new_value) if self.right else ImmutableTree(new_value)
                return ImmutableTree(self.value, self.left, new_right)
            else:
                # 值已存在,返回原树
                return self

        def contains(self, value):
            """检查是否包含值"""
            if value == self.value:
                return True
            elif value < self.value and self.left:
                return self.left.contains(value)
            elif value > self.value and self.right:
                return self.right.contains(value)
            return False

        def traverse(self):
            """中序遍历"""
            result = []
            if self.left:
                result.extend(self.left.traverse())
            result.append(self.value)
            if self.right:
                result.extend(self.right.traverse())
            return result

    print("不可变二叉树示例:")

    # 创建树
    tree1 = ImmutableTree(5)
    print(f"初始树: {tree1}")
    print(f"遍历: {tree1.traverse()}")

    # 插入值(创建新树)
    tree2 = tree1.insert(3)
    print(f"插入3后: {tree2}")
    print(f"遍历: {tree2.traverse()}")

    tree3 = tree2.insert(7)
    print(f"插入7后: {tree3}")
    print(f"遍历: {tree3.traverse()}")

    tree4 = tree3.insert(1).insert(4).insert(6).insert(8)
    print(f"插入更多值后: {tree4}")
    print(f"遍历: {tree4.traverse()}")

    print(f"\n所有版本共存:")
    print(f"  tree1: {tree1.traverse()}")
    print(f"  tree2: {tree2.traverse()}")
    print(f"  tree3: {tree3.traverse()}")
    print(f"  tree4: {tree4.traverse()}")

    # 验证不可变性
    print(f"\n验证不可变性:")
    print(f"  tree1 is tree2? {tree1 is tree2}")
    print(f"  原始树仍然完整: {tree1.traverse()}")

    # 6. 持久化数据结构(Persistent Data Structures)
    print("\n6. 持久化数据结构:")

    class PersistentList:
        """持久化列表(使用二叉树实现)"""

        class Node:
            """内部节点类"""
            __slots__ = ('size', 'value', 'left', 'right')

            def __init__(self, size, value, left=None, right=None):
                self.size = size  # 子树大小
                self.value = value
                self.left = left
                self.right = right

        def __init__(self, root=None):
            self.root = root

        @classmethod
        def _create_node(cls, value, left=None, right=None):
            """创建节点"""
            left_size = left.size if left else 0
            right_size = right.size if right else 0
            size = left_size + right_size + 1
            return cls.Node(size, value, left, right)

        def prepend(self, value):
            """在前面添加元素,返回新列表"""
            return PersistentList(self._prepend(value, self.root))

        def _prepend(self, value, node):
            """递归在前面添加元素"""
            if not node:
                return self._create_node(value)

            # 平衡插入
            if not node.left:
                return self._create_node(value, None, node)

            new_left = self._prepend(value, node.left)
            return self._create_node(node.value, new_left, node.right)

        def get(self, index):
            """获取元素"""
            if index < 0 or index >= len(self):
                raise IndexError("索引越界")
            return self._get(index, self.root)

        def _get(self, index, node):
            """递归获取元素"""
            left_size = node.left.size if node.left else 0

            if index < left_size:
                return self._get(index, node.left)
            elif index == left_size:
                return node.value
            else:
                return self._get(index - left_size - 1, node.right)

        def __len__(self):
            return self.root.size if self.root else 0

        def __iter__(self):
            """迭代器"""
            def traverse(node):
                if node:
                    yield from traverse(node.left)
                    yield node.value
                    yield from traverse(node.right)
            return traverse(self.root)

        def __repr__(self):
            return f"PersistentList({list(self)})"

    print("持久化列表示例:")

    # 创建空列表
    plist1 = PersistentList()
    print(f"空列表: {plist1}, 长度: {len(plist1)}")

    # 添加元素(每次返回新列表)
    plist2 = plist1.prepend(3)
    print(f"添加3后: {plist2}, 长度: {len(plist2)}")

    plist3 = plist2.prepend(2)
    print(f"添加2后: {plist3}, 长度: {len(plist3)}")

    plist4 = plist3.prepend(1)
    print(f"添加1后: {plist4}, 长度: {len(plist4)}")

    # 访问元素
    print(f"\n访问元素:")
    for i in range(len(plist4)):
        print(f"  plist4[{i}] = {plist4.get(i)}")

    # 验证持久性
    print(f"\n验证持久性:")
    print(f"  plist1: {list(plist1)}")
    print(f"  plist2: {list(plist2)}")
    print(f"  plist3: {list(plist3)}")
    print(f"  plist4: {list(plist4)}")
    print(f"  所有版本都是独立对象")

    # 7. 不可变性与并发
    print("\n7. 不可变性与并发:")

    import threading
    import time

    class BankAccount:
        """不可变银行账户"""

        def __init__(self, balance: float = 0.0, history: Tuple = ()):
            self.balance = balance
            self.history = history  # 交易历史

        def deposit(self, amount: float) -> 'BankAccount':
            """存款,返回新账户"""
            new_balance = self.balance + amount
            new_history = self.history + (f"存款: +{amount:.2f}",)
            return BankAccount(new_balance, new_history)

        def withdraw(self, amount: float) -> 'BankAccount':
            """取款,返回新账户"""
            if amount > self.balance:
                raise ValueError("余额不足")
            new_balance = self.balance - amount
            new_history = self.history + (f"取款: -{amount:.2f}",)
            return BankAccount(new_balance, new_history)

        def __repr__(self):
            history_str = "\n  ".join(self.history[-5:])  # 只显示最近5条
            return f"余额: ${self.balance:.2f}\n最近交易:\n  {history_str}"

    def concurrent_transactions(account, thread_id):
        """并发交易"""
        import random

        for i in range(3):
            amount = random.uniform(10, 100)
            if random.choice([True, False]):
                new_account = account.deposit(amount)
                print(f"线程{thread_id}: 存款${amount:.2f}, 新余额${new_account.balance:.2f}")
            else:
                try:
                    new_account = account.withdraw(amount)
                    print(f"线程{thread_id}: 取款${amount:.2f}, 新余额${new_account.balance:.2f}")
                except ValueError as e:
                    print(f"线程{thread_id}: 取款失败 - {e}")

            account = new_account
            time.sleep(random.uniform(0.1, 0.3))

        return account

    print("不可变账户的并发交易:")

    # 初始账户
    account = BankAccount(1000.0)
    print(f"初始账户:\n{account}")

    # 模拟并发交易(注意:由于GIL,Python线程不是真正的并行)
    print("\n开始并发交易:")
    threads = []
    results = []

    for i in range(3):
        t = threading.Thread(
            target=lambda i=i: results.append(concurrent_transactions(account, i))
        )
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    print(f"\n交易完成,每个线程有自己的账户版本:")
    for i, result in enumerate(results):
        print(f"线程{i}的最终账户:")
        print(result)
        print()

    print(f"原始账户保持不变:\n{account}")

# 运行演示
demonstrate_immutable_data()

3.2 代数数据类型(ADT)与模式匹配

代数数据类型是函数式编程中强大的数据建模工具,配合模式匹配可以编写非常优雅的代码。

# ============================================================================
# 代数数据类型(ADT)与模式匹配
# ============================================================================

print("\n=== 代数数据类型(ADT)与模式匹配 ===")

def demonstrate_adt_and_pattern_matching():
    """演示代数数据类型与模式匹配"""

    from typing import Union, Optional, List
    from enum import Enum
    from dataclasses import dataclass
    import json

    print("代数数据类型(Algebraic Data Types)的核心概念:")
    print("1. 乘积类型(Product Types):类似结构体,包含多个字段")
    print("2. 和类型(Sum Types):类似枚举,但每个变体可以携带数据")
    print("3. 递归类型:类型可以包含自身")

    # 1. 使用Python实现ADT
    print("\n1. 使用Python实现ADT:")

    # 和类型:使用Union和dataclass
    @dataclass(frozen=True)
    class Success:
        """成功结果"""
        value: any

        def __str__(self):
            return f"Success({self.value})"

    @dataclass(frozen=True)
    class Failure:
        """失败结果"""
        error: str

        def __str__(self):
            return f"Failure({self.error})"

    # 结果类型是Success或Failure
    Result = Union[Success, Failure]

    def process_result(result: Result) -> str:
        """处理结果(模式匹配的简单实现)"""
        if isinstance(result, Success):
            return f"处理成功: {result.value}"
        elif isinstance(result, Failure):
            return f"处理失败: {result.error}"
        else:
            raise TypeError(f"未知结果类型: {type(result)}")

    print("简单ADT示例 - 结果类型:")
    results = [Success(42), Failure("文件未找到"), Success("完成")]

    for result in results:
        print(f"  {result} -> {process_result(result)}")

    # 2. 更复杂的ADT:表达式树
    print("\n2. 复杂的ADT:表达式树")

    @dataclass(frozen=True)
    class Number:
        """数字"""
        value: float

        def __str__(self):
            return str(self.value)

    @dataclass(frozen=True)
    class Variable:
        """变量"""
        name: str

        def __str__(self):
            return self.name

    @dataclass(frozen=True)
    class Add:
        """加法"""
        left: 'Expr'
        right: 'Expr'

        def __str__(self):
            return f"({self.left} + {self.right})"

    @dataclass(frozen=True)
    class Multiply:
        """乘法"""
        left: 'Expr'
        right: 'Expr'

        def __str__(self):
            return f"({self.left} * {self.right})"

    @dataclass(frozen=True)
    class Power:
        """幂"""
        base: 'Expr'
        exponent: 'Expr'

        def __str__(self):
            return f"({self.base}^{self.exponent})"

    # 表达式类型
    Expr = Union[Number, Variable, Add, Multiply, Power]

    def evaluate(expr: Expr, env: dict = None) -> float:
        """求值表达式"""
        env = env or {}

        # 模式匹配求值
        if isinstance(expr, Number):
            return expr.value
        elif isinstance(expr, Variable):
            return env.get(expr.name, 0.0)
        elif isinstance(expr, Add):
            return evaluate(expr.left, env) + evaluate(expr.right, env)
        elif isinstance(expr, Multiply):
            return evaluate(expr.left, env) * evaluate(expr.right, env)
        elif isinstance(expr, Power):
            return evaluate(expr.base, env) ** evaluate(expr.exponent, env)
        else:
            raise TypeError(f"未知表达式类型: {type(expr)}")

    def derivative(expr: Expr, var: str) -> Expr:
        """对变量求导"""
        # 模式匹配求导规则
        if isinstance(expr, Number):
            return Number(0)  # 常数的导数为0

        elif isinstance(expr, Variable):
            if expr.name == var:
                return Number(1)  # dx/dx = 1
            else:
                return Number(0)  # dy/dx = 0

        elif isinstance(expr, Add):
            # d(u+v)/dx = du/dx + dv/dx
            return Add(
                derivative(expr.left, var),
                derivative(expr.right, var)
            )

        elif isinstance(expr, Multiply):
            # d(u*v)/dx = u*dv/dx + v*du/dx
            return Add(
                Multiply(expr.left, derivative(expr.right, var)),
                Multiply(expr.right, derivative(expr.left, var))
            )

        elif isinstance(expr, Power):
            # 简化:假设指数是常数
            if isinstance(expr.exponent, Number):
                # d(x^n)/dx = n*x^(n-1)
                return Multiply(
                    Number(expr.exponent.value),
                    Power(
                        expr.base,
                        Number(expr.exponent.value - 1)
                    )
                )
            else:
                # 复杂情况:返回占位符
                return Variable("复杂导数")

        else:
            raise TypeError(f"未知表达式类型: {type(expr)}")

    print("表达式树ADT示例:")

    # 构建表达式:f(x) = x^2 + 2*x + 1
    x = Variable("x")
    f = Add(
        Add(
            Power(x, Number(2)),
            Multiply(Number(2), x)
        ),
        Number(1)
    )

    print(f"函数: f(x) = {f}")

    # 求值
    env = {"x": 3.0}
    result = evaluate(f, env)
    print(f"f({env['x']}) = {result}")

    # 求导
    df_dx = derivative(f, "x")
    print(f"导数: f'(x) = {df_dx}")

    # 求导后的值
    df_dx_value = evaluate(df_dx, env)
    print(f"f'({env['x']}) = {df_dx_value}")

    # 3. 使用match语句(Python 3.10+)
    print("\n3. 使用match语句(Python 3.10+):")

    # 形状ADT
    @dataclass(frozen=True)
    class Circle:
        """圆形"""
        radius: float

    @dataclass(frozen=True)
    class Rectangle:
        """矩形"""
        width: float
        height: float

    @dataclass(frozen=True)
    class Triangle:
        """三角形"""
        a: float
        b: float
        c: float

    Shape = Union[Circle, Rectangle, Triangle]

    def process_shape(shape: Shape) -> str:
        """处理形状(使用match语句)"""
        match shape:
            case Circle(radius=r):
                import math
                area = math.pi * r * r
                return f"圆形: 半径={r}, 面积={area:.2f}"

            case Rectangle(width=w, height=h):
                area = w * h
                return f"矩形: 宽={w}, 高={h}, 面积={area:.2f}"

            case Triangle(a=a, b=b, c=c):
                # 使用海伦公式
                s = (a + b + c) / 2
                area = (s * (s - a) * (s - b) * (s - c)) ** 0.5
                return f"三角形: 边长=({a}, {b}, {c}), 面积={area:.2f}"

            case _:
                return "未知形状"

    print("使用match语句进行模式匹配:")
    shapes = [
        Circle(5.0),
        Rectangle(4.0, 6.0),
        Triangle(3.0, 4.0, 5.0),
    ]

    for shape in shapes:
        print(f"  {process_shape(shape)}")

    # 4. 递归ADT:链表
    print("\n4. 递归ADT:链表")

    from typing import Generic, TypeVar

    T = TypeVar('T')

    @dataclass(frozen=True)
    class Nil:
        """空列表"""
        def __str__(self):
            return "Nil"

    @dataclass(frozen=True)
    class Cons(Generic[T]):
        """非空列表(头元素和尾列表)"""
        head: T
        tail: 'List[T]'  # 递归引用

        def __str__(self):
            return f"({self.head} : {self.tail})"

    # 链表类型
    List = Union[Nil, Cons[T]]

    def list_to_python(lst: List) -> list:
        """将函数式链表转换为Python列表"""
        result = []
        current = lst

        while isinstance(current, Cons):
            result.append(current.head)
            current = current.tail

        return result

    def python_to_list(py_list: list) -> List:
        """将Python列表转换为函数式链表"""
        lst = Nil()
        for item in reversed(py_list):
            lst = Cons(item, lst)
        return lst

    def map_list(f, lst: List) -> List:
        """映射函数到链表"""
        match lst:
            case Nil():
                return Nil()
            case Cons(head=head, tail=tail):
                return Cons(f(head), map_list(f, tail))
            case _:
                raise TypeError("不是有效的链表")

    def filter_list(predicate, lst: List) -> List:
        """过滤链表"""
        match lst:
            case Nil():
                return Nil()
            case Cons(head=head, tail=tail):
                if predicate(head):
                    return Cons(head, filter_list(predicate, tail))
                else:
                    return filter_list(predicate, tail)
            case _:
                raise TypeError("不是有效的链表")

    def fold_list(f, acc, lst: List):
        """折叠链表"""
        match lst:
            case Nil():
                return acc
            case Cons(head=head, tail=tail):
                return fold_list(f, f(acc, head), tail)
            case _:
                raise TypeError("不是有效的链表")

    print("函数式链表ADT示例:")

    # 创建链表
    lst = Cons(1, Cons(2, Cons(3, Cons(4, Cons(5, Nil())))))
    print(f"原始链表: {lst}")
    print(f"Python列表: {list_to_python(lst)}")

    # 映射
    doubled = map_list(lambda x: x * 2, lst)
    print(f"加倍后: {doubled}")

    # 过滤
    evens = filter_list(lambda x: x % 2 == 0, lst)
    print(f"偶数: {evens}")

    # 折叠(求和)
    total = fold_list(lambda acc, x: acc + x, 0, lst)
    print(f"总和: {total}")

    # 5. 使用单例模式实现更安全的ADT
    print("\n5. 使用单例模式实现更安全的ADT:")

    class Option(Generic[T]):
        """Option类型(类似Haskell的Maybe)"""

        @staticmethod
        def some(value: T) -> 'Some[T]':
            return Some(value)

        @staticmethod
        def none() -> 'None_[T]':
            return None_()

    @dataclass(frozen=True)
    class Some(Generic[T]):
        """有值"""
        value: T

        def __str__(self):
            return f"Some({self.value})"

        def map(self, f) -> 'Option':
            """映射"""
            return Some(f(self.value))

        def flat_map(self, f) -> 'Option':
            """扁平映射"""
            return f(self.value)

        def get_or_else(self, default):
            """获取值或默认值"""
            return self.value

    @dataclass(frozen=True)
    class None_(Generic[T]):
        """无值"""

        def __str__(self):
            return "None"

        def map(self, f) -> 'Option':
            """映射(无值时不执行)"""
            return self

        def flat_map(self, f) -> 'Option':
            """扁平映射"""
            return self

        def get_or_else(self, default):
            """获取默认值"""
            return default

    # 使用Option处理可能缺失的值
    def safe_divide(numerator: float, denominator: float) -> Option[float]:
        """安全除法"""
        if denominator == 0:
            return Option.none()
        else:
            return Option.some(numerator / denominator)

    def safe_sqrt(x: float) -> Option[float]:
        """安全平方根"""
        if x < 0:
            return Option.none()
        else:
            return Option.some(x ** 0.5)

    print("Option类型示例:")

    # 链式操作
    result1 = (Option.some(16)
              .flat_map(lambda x: safe_sqrt(x))
              .flat_map(lambda x: safe_divide(1, x)))

    result2 = (Option.some(-4)
              .flat_map(safe_sqrt)
              .flat_map(lambda x: safe_divide(1, x)))

    result3 = (Option.some(0)
              .flat_map(lambda x: safe_divide(1, x))
              .flat_map(safe_sqrt))

    results = [result1, result2, result3]
    for i, result in enumerate(results, 1):
        print(f"计算{i}: {result}, 值: {result.get_or_else('无结果')}")

    # 6. 使用枚举实现ADT
    print("\n6. 使用枚举实现ADT:")

    from enum import Enum

    class JsonValue:
        """JSON值ADT"""
        pass

    class JsonNull(JsonValue):
        """JSON null"""
        def __str__(self):
            return "null"

    class JsonBool(JsonValue):
        """JSON布尔值"""
        def __init__(self, value: bool):
            self.value = value

        def __str__(self):
            return "true" if self.value else "false"

    class JsonNumber(JsonValue):
        """JSON数字"""
        def __init__(self, value: float):
            self.value = value

        def __str__(self):
            return str(self.value)

    class JsonString(JsonValue):
        """JSON字符串"""
        def __init__(self, value: str):
            self.value = value

        def __str__(self):
            return json.dumps(self.value)

    class JsonArray(JsonValue):
        """JSON数组"""
        def __init__(self, values: List[JsonValue]):
            self.values = values

        def __str__(self):
            items = [str(v) for v in self.values]
            return "[" + ", ".join(items) + "]"

    class JsonObject(JsonValue):
        """JSON对象"""
        def __init__(self, fields: dict):
            self.fields = fields

        def __str__(self):
            items = [f"{json.dumps(k)}: {str(v)}" for k, v in self.fields.items()]
            return "{" + ", ".join(items) + "}"

    def json_stringify(value: JsonValue) -> str:
        """JSON字符串化(使用模式匹配)"""
        match value:
            case JsonNull():
                return "null"
            case JsonBool(b):
                return "true" if b else "false"
            case JsonNumber(n):
                return str(n)
            case JsonString(s):
                return json.dumps(s)
            case JsonArray(arr):
                items = [json_stringify(item) for item in arr.values]
                return "[" + ", ".join(items) + "]"
            case JsonObject(obj):
                items = [f"{json.dumps(k)}: {json_stringify(v)}" for k, v in obj.fields.items()]
                return "{" + ", ".join(items) + "}"
            case _:
                raise TypeError(f"未知JSON类型: {type(value)}")

    print("JSON ADT示例:")

    # 构建复杂的JSON值
    json_data = JsonObject({
        "name": JsonString("Alice"),
        "age": JsonNumber(30),
        "active": JsonBool(True),
        "scores": JsonArray([
            JsonNumber(95.5),
            JsonNumber(88.0),
            JsonNumber(92.5)
        ]),
        "address": JsonObject({
            "street": JsonString("123 Main St"),
            "city": JsonString("New York"),
            "zip": JsonNumber(10001)
        }),
        "tags": JsonArray([
            JsonString("user"),
            JsonString("premium"),
            JsonNull()  # 空标签
        ])
    })

    print("JSON值:")
    print(json_stringify(json_data))

    # 与标准库对比
    print("\n与标准库json.dumps()对比:")
    python_data = {
        "name": "Alice",
        "age": 30,
        "active": True,
        "scores": [95.5, 88.0, 92.5],
        "address": {
            "street": "123 Main St",
            "city": "New York",
            "zip": 10001
        },
        "tags": ["user", "premium", None]
    }

    std_json = json.dumps(python_data, indent=2)
    print(f"标准库:\n{std_json}")

# 运行演示
demonstrate_adt_and_pattern_matching()

第四部分:数据组合的最佳实践与未来趋势

4.1 数据建模的最佳实践

良好的数据建模是构建健壮软件的基础。

# ============================================================================
# 数据建模的最佳实践
# ============================================================================

print("\n=== 数据建模的最佳实践 ===")

def demonstrate_data_modeling_best_practices():
    """演示数据建模的最佳实践"""

    from dataclasses import dataclass, field, asdict
    from typing import List, Dict, Optional, Set, Any
    from enum import Enum
    import json
    from datetime import datetime, date
    import sys

    print("数据建模的十个最佳实践:")

    # 1. 明确领域模型
    print("\n1. 明确领域模型:")
    print("   理解业务领域,创建反映现实世界的模型")

    @dataclass
    class Money:
        """货币值(值对象)"""
        amount: float
        currency: str = "USD"

        def __post_init__(self):
            """验证"""
            if self.amount < 0:
                raise ValueError("金额不能为负")
            if len(self.currency) != 3:
                raise ValueError("货币代码必须是3个字符")

        def __add__(self, other: 'Money') -> 'Money':
            """加法"""
            if self.currency != other.currency:
                raise ValueError("货币不匹配")
            return Money(self.amount + other.amount, self.currency)

        def __mul__(self, multiplier: float) -> 'Money':
            """乘法"""
            return Money(self.amount * multiplier, self.currency)

        def format(self) -> str:
            """格式化"""
            return f"{self.currency} {self.amount:,.2f}"

    @dataclass
    class Address:
        """地址(值对象)"""
        street: str
        city: str
        state: str
        zip_code: str
        country: str = "USA"

        def __post_init__(self):
            """验证"""
            if not self.zip_code.isdigit() or len(self.zip_code) != 5:
                raise ValueError("邮政编码必须是5位数字")

    @dataclass
    class Customer:
        """客户(实体)"""
        id: str
        name: str
        email: str
        address: Address
        joined_date: date = field(default_factory=date.today)

        def __post_init__(self):
            """验证"""
            if "@" not in self.email:
                raise ValueError("无效的邮箱地址")

    print("   示例:电商领域模型")
    print("   - Money: 值对象,具有验证和业务逻辑")
    print("   - Address: 值对象,不可变")
    print("   - Customer: 实体,具有标识符")

    # 2. 使用值对象
    print("\n2. 使用值对象:")
    print("   对于没有标识符的概念,使用值对象")

    try:
        money1 = Money(100.0, "USD")
        money2 = Money(50.0, "USD")
        total = money1 + money2
        print(f"   值对象示例: {money1.format()} + {money2.format()} = {total.format()}")

        # 验证
        try:
            invalid_money = Money(-100.0)
        except ValueError as e:
            print(f"   验证生效: {e}")
    except Exception as e:
        print(f"   示例错误: {e}")

    # 3. 保持不可变性
    print("\n3. 保持不可变性:")
    print("   尽可能使用不可变数据结构")

    @dataclass(frozen=True)
    class ImmutableProduct:
        """不可变产品"""
        id: str
        name: str
        price: Money

        def with_discount(self, percent: float) -> 'ImmutableProduct':
            """应用折扣,返回新产品"""
            if not 0 <= percent <= 100:
                raise ValueError("折扣百分比必须在0-100之间")

            discount_factor = 1 - (percent / 100)
            new_price = self.price * discount_factor

            return ImmutableProduct(
                id=self.id,
                name=self.name,
                price=new_price
            )

    product = ImmutableProduct("P001", "Laptop", Money(1000.0))
    discounted_product = product.with_discount(20)

    print(f"   原始产品: {product.name} - {product.price.format()}")
    print(f"   折扣产品: {discounted_product.name} - {discounted_product.price.format()}")
    print(f"   原始产品不变: {product.price.format()}")

    # 4. 领域驱动设计(DDD)模式
    print("\n4. 领域驱动设计(DDD)模式:")

    class OrderStatus(Enum):
        """订单状态枚举"""
        PENDING = "pending"
        CONFIRMED = "confirmed"
        SHIPPED = "shipped"
        DELIVERED = "delivered"
        CANCELLED = "cancelled"

    @dataclass
    class OrderItem:
        """订单项(值对象)"""
        product_id: str
        product_name: str
        quantity: int
        unit_price: Money

        def total_price(self) -> Money:
            return self.unit_price * self.quantity

        def __post_init__(self):
            if self.quantity <= 0:
                raise ValueError("数量必须大于0")

    class Order:
        """订单(聚合根)"""

        def __init__(self, order_id: str, customer_id: str):
            self.order_id = order_id
            self.customer_id = customer_id
            self._items: List[OrderItem] = []
            self._status: OrderStatus = OrderStatus.PENDING
            self.created_at: datetime = datetime.now()
            self.updated_at: datetime = self.created_at

        def add_item(self, item: OrderItem):
            """添加订单项"""
            if self._status != OrderStatus.PENDING:
                raise ValueError("只能向待处理订单添加商品")

            self._items.append(item)
            self.updated_at = datetime.now()

        def remove_item(self, product_id: str):
            """移除订单项"""
            if self._status != OrderStatus.PENDING:
                raise ValueError("只能从待处理订单移除商品")

            self._items = [item for item in self._items if item.product_id != product_id]
            self.updated_at = datetime.now()

        def confirm(self):
            """确认订单"""
            if self._status != OrderStatus.PENDING:
                raise ValueError("只能确认待处理订单")
            if not self._items:
                raise ValueError("订单不能为空")

            self._status = OrderStatus.CONFIRMED
            self.updated_at = datetime.now()

        def ship(self):
            """发货"""
            if self._status != OrderStatus.CONFIRMED:
                raise ValueError("只能发货已确认订单")

            self._status = OrderStatus.SHIPPED
            self.updated_at = datetime.now()

        def cancel(self):
            """取消订单"""
            if self._status not in [OrderStatus.PENDING, OrderStatus.CONFIRMED]:
                raise ValueError("只能取消待处理或已确认订单")

            self._status = OrderStatus.CANCELLED
            self.updated_at = datetime.now()

        @property
        def items(self) -> List[OrderItem]:
            """获取订单项(只读)"""
            return self._items.copy()

        @property
        def status(self) -> OrderStatus:
            """获取状态(只读)"""
            return self._status

        def total_amount(self) -> Money:
            """总金额"""
            if not self._items:
                return Money(0.0)

            total = Money(0.0)
            for item in self._items:
                total = total + item.total_price()
            return total

        def __str__(self):
            return (f"订单 {self.order_id}\n"
                   f"状态: {self.status.value}\n"
                   f"客户: {self.customer_id}\n"
                   f"商品数: {len(self._items)}\n"
                   f"总金额: {self.total_amount().format()}")

    print("   DDD模式示例:订单聚合")
    order = Order("ORD-001", "CUST-001")

    order.add_item(OrderItem("P001", "Laptop", 1, Money(1000.0)))
    order.add_item(OrderItem("P002", "Mouse", 2, Money(25.0)))

    print(f"\n   创建订单:\n{order}")

    order.confirm()
    print(f"\n   确认后:\n状态: {order.status.value}")

    # 5. 数据验证
    print("\n5. 数据验证:")
    print("   在数据进入系统时进行验证")

    class ValidatedUser:
        """带验证的用户"""

        def __init__(self, username: str, email: str, age: int):
            self._validate_username(username)
            self._validate_email(email)
            self._validate_age(age)

            self.username = username
            self.email = email
            self.age = age

        @staticmethod
        def _validate_username(username: str):
            if not username:
                raise ValueError("用户名不能为空")
            if len(username) < 3:
                raise ValueError("用户名至少3个字符")
            if len(username) > 50:
                raise ValueError("用户名最多50个字符")
            if not username.isalnum():
                raise ValueError("用户名只能包含字母和数字")

        @staticmethod
        def _validate_email(email: str):
            if "@" not in email:
                raise ValueError("无效的邮箱地址")
            if not email.split("@")[-1].count(".") >= 1:
                raise ValueError("无效的邮箱域名")

        @staticmethod
        def _validate_age(age: int):
            if age < 0:
                raise ValueError("年龄不能为负")
            if age > 150:
                raise ValueError("年龄不能超过150")

        def __str__(self):
            return f"用户: {self.username}, 邮箱: {self.email}, 年龄: {self.age}"

    print("   数据验证示例:")
    try:
        user1 = ValidatedUser("alice123", "alice@example.com", 30)
        print(f"   有效用户: {user1}")

        user2 = ValidatedUser("a", "invalid-email", 200)
        print(f"   无效用户: {user2}")
    except ValueError as e:
        print(f"   验证失败: {e}")

    # 6. 序列化与反序列化
    print("\n6. 序列化与反序列化:")
    print("   为数据类提供序列化方法")

    @dataclass
    class SerializableProduct:
        """可序列化产品"""
        id: str
        name: str
        price: float
        category: str

        def to_dict(self) -> dict:
            """转换为字典"""
            return asdict(self)

        def to_json(self) -> str:
            """转换为JSON"""
            return json.dumps(self.to_dict(), indent=2)

        @classmethod
        def from_dict(cls, data: dict) -> 'SerializableProduct':
            """从字典创建"""
            return cls(**data)

        @classmethod
        def from_json(cls, json_str: str) -> 'SerializableProduct':
            """从JSON创建"""
            data = json.loads(json_str)
            return cls.from_dict(data)

    product = SerializableProduct("P001", "Laptop", 999.99, "Electronics")

    print("   序列化示例:")
    json_str = product.to_json()
    print(f"   JSON: {json_str[:50]}...")

    product_copy = SerializableProduct.from_json(json_str)
    print(f"   反序列化: {product_copy}")

    # 7. 性能考虑
    print("\n7. 性能考虑:")
    print("   根据使用场景选择数据结构")

    def test_memory_usage():
        """测试内存使用"""

        # 使用__slots__的类
        class SlottedPoint:
            __slots__ = ('x', 'y')
            def __init__(self, x, y):
                self.x = x
                self.y = y

        # 普通类
        class RegularPoint:
            def __init__(self, x, y):
                self.x = x
                self.y = y

        # 创建多个实例
        n = 10000

        slotted_points = [SlottedPoint(i, i) for i in range(n)]
        regular_points = [RegularPoint(i, i) for i in range(n)]

        slotted_size = sys.getsizeof(slotted_points) + sum(sys.getsizeof(p) for p in slotted_points)
        regular_size = sys.getsizeof(regular_points) + sum(sys.getsizeof(p) for p in regular_points)

        print(f"   {n}个点的内存使用:")
        print(f"   使用__slots__: {slotted_size:,} 字节")
        print(f"   普通类: {regular_size:,} 字节")
        print(f"   节省: {(regular_size - slotted_size) / regular_size * 100:.1f}%")

    test_memory_usage()

    # 8. 文档化数据结构
    print("\n8. 文档化数据结构:")
    print("   为所有数据结构提供清晰的文档")

    @dataclass
    class WellDocumented:
        """
        良好文档化的数据类示例。

        属性:
            id (str): 唯一标识符,格式为"TYPE-001"
            name (str): 名称,长度在1-100字符之间
            created_at (datetime): 创建时间戳
            tags (List[str]): 标签列表,用于分类
        """
        id: str
        name: str
        created_at: datetime = field(default_factory=datetime.now)
        tags: List[str] = field(default_factory=list)

        def __post_init__(self):
            """
            初始化后验证。

            验证规则:
                - id不能为空
                - name长度在1-100字符之间
                - tags不能包含空字符串
            """
            if not self.id:
                raise ValueError("id不能为空")
            if not 1 <= len(self.name) <= 100:
                raise ValueError("name长度必须在1-100字符之间")
            if any(not tag for tag in self.tags):
                raise ValueError("tags不能包含空字符串")

        def add_tag(self, tag: str):
            """
            添加标签。

            参数:
                tag (str): 要添加的标签

            返回:
                无
            """
            if not tag:
                raise ValueError("标签不能为空")
            if tag not in self.tags:
                self.tags.append(tag)

    print("   文档化示例:")
    print("   - 类文档字符串描述整体目的")
    print("   - 属性文档说明每个字段")
    print("   - 方法文档说明参数和返回值")
    print("   - __post_init__用于验证")

    # 9. 测试数据类
    print("\n9. 测试数据类:")
    print("   为数据结构编写单元测试")

    import unittest

    class TestMoney(unittest.TestCase):
        """Money类的测试"""

        def test_creation(self):
            """测试创建"""
            money = Money(100.0, "USD")
            self.assertEqual(money.amount, 100.0)
            self.assertEqual(money.currency, "USD")

        def test_validation(self):
            """测试验证"""
            with self.assertRaises(ValueError):
                Money(-100.0)

        def test_addition(self):
            """测试加法"""
            money1 = Money(100.0, "USD")
            money2 = Money(50.0, "USD")
            total = money1 + money2
            self.assertEqual(total.amount, 150.0)

        def test_currency_mismatch(self):
            """测试货币不匹配"""
            money1 = Money(100.0, "USD")
            money2 = Money(50.0, "EUR")
            with self.assertRaises(ValueError):
                money1 + money2

    # 运行测试
    print("   测试示例(简化):")
    suite = unittest.TestLoader().loadTestsFromTestCase(TestMoney)
    runner = unittest.TextTestRunner(verbosity=0)
    result = runner.run(suite)
    print(f"   测试结果: {result.testsRun}个测试,{len(result.failures)}个失败")

    # 10. 演进和兼容性
    print("\n10. 演进和兼容性:")
    print("   设计能够演进的数据结构")

    @dataclass
    class BackwardCompatible:
        """向后兼容的数据类"""
        id: str
        name: str
        # 新字段有默认值,确保向后兼容
        version: int = 1
        metadata: Dict[str, Any] = field(default_factory=dict)

        @classmethod
        def from_legacy_dict(cls, data: dict) -> 'BackwardCompatible':
            """从旧格式字典创建"""
            # 处理旧格式
            if 'user_id' in data:  # 旧字段名
                data['id'] = data.pop('user_id')

            # 设置默认值
            data.setdefault('version', 1)
            data.setdefault('metadata', {})

            return cls(**data)

    print("   向后兼容示例:")
    legacy_data = {"user_id": "123", "name": "Alice"}
    new_obj = BackwardCompatible.from_legacy_dict(legacy_data)
    print(f"   旧数据转换: {new_obj}")

    current_data = {"id": "123", "name": "Alice", "version": 2, "metadata": {"role": "admin"}}
    current_obj = BackwardCompatible(**current_data)
    print(f"   当前数据: {current_obj}")

# 运行演示
demonstrate_data_modeling_best_practices()

# 数据组合的未来趋势
print("\n" + "="*60)
print("数据组合的未来趋势")
print("="*60)

print("""
1. 更强大的类型系统
   • 依赖类型(Dependent Types)
   • 细化类型(Refinement Types)
   • 会话类型(Session Types)

2. 零成本抽象
   • 编译时数据结构优化
   • 自动内存布局优化
   • 静态消除运行时开销

3. 形式验证
   • 自动证明数据结构不变性
   • 运行时行为验证
   • 安全保证

4. 领域特定语言(DSL)
   • 针对特定领域的数据建模语言
   • 自动代码生成
   • 可视化数据建模

5. 人工智能辅助
   • AI自动推荐数据结构
   • 性能优化建议
   • 重构建议

6. 跨语言数据交换
   • 统一的数据序列化格式
   • 自动类型映射
   • 跨平台数据共享

7. 实时协作数据结构
   • 冲突自由复制数据类型(CRDT)
   • 实时协同编辑
   • 分布式一致性

8. 可逆计算
   • 可逆数据结构
   • 无副作用的更新
   • 时间旅行调试

挑战与机遇:
1. 复杂性管理:随着系统增长,数据模型变得复杂
2. 性能权衡:类型安全与运行时性能的平衡
3. 学习曲线:新的类型系统和范式需要学习
4. 工具支持:需要更好的IDE和工具链支持
5. 团队协作:确保团队成员对数据模型有共同理解

最佳实践演进:
1. 数据优先设计:从数据模型开始设计系统
2. 契约驱动开发:明确定义数据契约
3. 渐进类型:从动态类型逐步转向静态类型
4. 可视化建模:使用工具可视化数据结构

结语:数据是软件的核心
良好的数据建模是构建可靠、可维护、高性能系统的基石。
随着技术的发展,我们将有更多工具和方法来设计和实现
高效的数据结构,但基本原则——清晰、一致、可验证——
将永远保持不变。
""")

总结

5.1 结构体与组合数据的核心洞见

通过这堂课,我们获得了以下核心洞见:

结构体是数据的逻辑分组:将相关的数据项组合在一起,形成有意义的整体。

组合优于继承:通过组合简单结构来构建复杂结构,而不是通过继承层次。

不可变性带来安全性:不可变数据结构在多线程环境和函数式编程中至关重要。

类型系统是强大的工具:静态类型检查可以在编译时捕获许多错误。

5.2 数据建模的优势

提高代码清晰度:良好的数据模型使代码意图更明确。

增强类型安全:编译时类型检查减少运行时错误。

促进代码复用:通用数据结构可以在多个地方重用。

简化并发编程:不可变数据结构无需锁即可共享。

支持领域建模:数据模型可以直接反映业务领域。

5.3 实用建议

何时使用结构体:

  • 当需要将相关数据组合在一起时
  • 当需要定义清晰的接口时
  • 当需要类型安全时
  • 当需要优化内存布局时

何时使用不可变数据结构:

  • 在多线程环境中
  • 在函数式编程中
  • 当需要数据历史记录时
  • 当需要简化推理时

数据建模原则:

  • 领域驱动:从业务领域出发设计数据结构
  • 单一职责:每个数据结构只做一件事
  • 明确契约:明确定义数据的验证规则
  • 渐进复杂:从简单结构开始,逐渐增加复杂性

5.4 语言选择建议

Rust:强大的类型系统,所有权模型,零成本抽象,适合系统编程。

TypeScript:渐进类型,优秀的IDE支持,适合Web开发。

Go:简单直接的结构体,组合优于继承,适合网络服务。

Python:动态类型,多种选择(dataclass、namedtuple、Pydantic),适合快速开发。

Haskell/OCaml:强大的代数数据类型,模式匹配,纯函数式。

5.5 终极目标:数据驱动设计

记住,数据是软件的核心。良好的数据建模不仅影响代码质量,还影响系统架构和团队协作:

数据优先:从数据模型开始设计,而不是从代码开始。

契约明确:明确定义数据的形状、约束和行为。

演化友好:设计能够适应变化的数据结构。

工具辅助:利用类型系统、测试和文档来确保数据质量。

通过本课的学习,你应该掌握了结构体和组合数据的核心概念,并能够在实际项目中应用这些强大的数据建模工具,创建更加清晰、安全、可维护的代码。

第五十九课:结构体与组合数据 – 数据建模的艺术!完。

第六十课:闭包 – 函数与其词法环境

前言:从函数到闭包

在编程语言的发展中,函数最初只是执行特定任务的代码块。但随着函数式编程的兴起,我们逐渐认识到函数不仅仅是代码,还可以携带状态。这就是闭包(Closure)诞生的背景。

闭包是指那些能够访问自由变量的函数。自由变量是指在函数中使用的,但既不是函数参数也不是函数局部变量的变量。闭包让函数可以“记住”它被创建时的环境,即使这个环境已经不再存在。

从JavaScript的闭包到Python的嵌套函数,从Lambda表达式到函数对象,闭包已经成为现代编程语言不可或缺的一部分。

今天,我们将深入探索闭包的世界,学习如何利用这些强大的特性来编写更简洁、更灵活、更强大的代码。

第一部分:闭包的基础概念

1.1 闭包的实质:函数与环境的结合

闭包的实质是函数与其词法环境(lexical environment)的结合。词法环境是指函数在定义时所处的环境,包括所有可访问的变量。

python

# ============================================================================
# 闭包的实质:函数与环境的结合
# ============================================================================

print("=== 闭包的实质:函数与环境的结合 ===")

def demonstrate_closure_essence():
    """演示闭包的实质"""
    
    print("闭包解决的三个核心问题:")
    print("1. 状态封装问题:将状态与函数绑定")
    print("2. 数据隐私问题:创建私有变量")
    print("3. 函数工厂问题:动态生成函数")
    
    # 闭包在不同语言中的发展
    print("\n闭包在不同语言中的发展历程:")
    
    timeline = [
        ("1958", "LISP", "第一个支持闭包的语言", "Lambda表达式和词法作用域"),
        ("1975", "Scheme", "闭包作为一等公民", "完整的闭包支持"),
        ("1995", "JavaScript", "函数作用域和闭包", "广泛使用闭包"),
        ("2000", "Python 2.2", "嵌套函数和闭包", "使用nonlocal关键字"),
        ("2004", "C# 2.0", "匿名方法和闭包", "Lambda表达式"),
        ("2011", "C++11", "Lambda表达式", "捕获列表"),
        ("2014", "Java 8", "Lambda表达式", "有限的闭包支持"),
        ("2015", "Swift", "闭包作为一等公民", "强引用和弱引用"),
    ]
    
    for year, language, feature, description in timeline:
        print(f"  {year}: {language:15} | {feature:30} | {description}")
    
    # 从普通函数到闭包
    print("\n从普通函数到闭包:")
    
    # 1. 普通函数
    def normal_function(x):
        return x * 2
    
    print("1. 普通函数:")
    print(f"   normal_function(5) = {normal_function(5)}")
    print(f"   没有状态,每次调用相同输入得到相同输出")
    
    # 2. 使用全局状态
    counter = 0
    
    def function_with_global_state():
        global counter
        counter += 1
        return counter
    
    print("\n2. 使用全局状态:")
    print(f"   第一次调用: {function_with_global_state()}")
    print(f"   第二次调用: {function_with_global_state()}")
    print(f"   问题:全局状态容易被修改,不安全")
    
    # 3. 使用闭包
    def make_counter():
        count = 0  # 局部变量,但会被内部函数引用
        
        def counter():
            nonlocal count  # Python 3中需要使用nonlocal
            count += 1
            return count
        
        return counter
    
    print("\n3. 使用闭包:")
    my_counter = make_counter()
    print(f"   第一次调用: {my_counter()}")
    print(f"   第二次调用: {my_counter()}")
    print(f"   状态被封装在闭包中,安全且私有")
    
    # 不同语言的闭包实现对比
    print("\n不同语言的闭包实现:")
    
    implementations = {
        "JavaScript": {
            "特点": "基于函数作用域,闭包无处不在",
            "示例": """
// 创建闭包
function makeCounter() {
    let count = 0;
    return function() {
        count++;
        return count;
    };
}

// 使用
const counter = makeCounter();
console.log(counter()); // 1
console.log(counter()); // 2

// 每个闭包有自己的状态
const counter2 = makeCounter();
console.log(counter2()); // 1 (独立)
""",
            "内存管理": "垃圾回收,循环引用可能导致内存泄漏"
        },
        "Python": {
            "特点": "使用nonlocal或可变对象修改外部变量",
            "示例": """
def make_counter():
    count = 0
    def counter():
        nonlocal count  # Python 3需要
        count += 1
        return count
    return counter

# 使用
counter = make_counter()
print(counter())  # 1
print(counter())  # 2

# 另一种方式:使用可变对象
def make_counter2():
    count = [0]
    def counter():
        count[0] += 1
        return count[0]
    return counter
""",
            "内存管理": "引用计数和垃圾回收"
        },
        "C++": {
            "特点": "Lambda表达式,显式捕获列表",
            "示例": """
// 创建闭包
auto make_counter() {
    int count = 0;
    // Lambda表达式,按引用捕获count
    auto counter = [&count]() {
        count++;
        return count;
    };
    return counter;
}

// 注意:返回的闭包不能超过count的生命周期
// 安全的方式:将count也捕获到闭包中
auto make_safe_counter() {
    return [count = 0]() mutable {
        count++;
        return count;
    };
}

// 使用
auto counter = make_safe_counter();
cout << counter() << endl; // 1
cout << counter() << endl; // 2
""",
            "内存管理": "手动内存管理,Lambda捕获方式决定生命周期"
        },
        "C#": {
            "特点": "Lambda表达式和匿名方法,自动捕获",
            "示例": """
// 创建闭包
Func<int> MakeCounter() {
    int count = 0;
    // Lambda表达式自动捕获count
    return () => {
        count++;
        return count;
    };
}

// 使用
var counter = MakeCounter();
Console.WriteLine(counter()); // 1
Console.WriteLine(counter()); // 2
""",
            "内存管理": "垃圾回收,自动管理生命周期"
        },
        "Java": {
            "特点": "Lambda表达式,但只能捕获final或等效final的变量",
            "示例": """
// Java中的闭包有限制
import java.util.function.Supplier;

public class ClosureExample {
    public static Supplier<Integer> makeCounter() {
        final int[] count = {0};  // 使用数组来绕过final限制
        return () -> {
            count[0]++;
            return count[0];
        };
    }
    
    public static void main(String[] args) {
        Supplier<Integer> counter = makeCounter();
        System.out.println(counter.get()); // 1
        System.out.println(counter.get()); // 2
    }
}
""",
            "内存管理": "垃圾回收,但闭包功能有限"
        },
        "Go": {
            "特点": "匿名函数,捕获变量通过闭包引用",
            "示例": """
// 创建闭包
func makeCounter() func() int {
    count := 0
    return func() int {
        count++
        return count
    }
}

// 使用
counter := makeCounter()
fmt.Println(counter()) // 1
fmt.Println(counter()) // 2
""",
            "内存管理": "垃圾回收,自动管理"
        },
        "Rust": {
            "特点": "闭包是trait对象,有三种类型:Fn、FnMut、FnOnce",
            "示例": """
// 创建闭包
fn make_counter() -> impl FnMut() -> i32 {
    let mut count = 0;
    // move关键字取得count的所有权
    move || {
        count += 1;
        count
    }
}

// 使用
let mut counter = make_counter();
println!("{}", counter()); // 1
println!("{}", counter()); // 2
""",
            "内存管理": "所有权系统,确保内存安全"
        },
        "Swift": {
            "特点": "闭包是引用类型,自动捕获上下文",
            "示例": """
// 创建闭包
func makeCounter() -> () -> Int {
    var count = 0
    return {
        count += 1
        return count
    }
}

// 使用
let counter = makeCounter()
print(counter()) // 1
print(counter()) // 2
""",
            "内存管理": "ARC(自动引用计数),注意循环引用"
        }
    }
    
    for lang, info in implementations.items():
        print(f"\n{lang}:")
        print(f"  特点: {info['特点']}")
        print(f"  示例: {info['示例'].strip()}")
        if '内存管理' in info:
            print(f"  内存管理: {info['内存管理']}")
    
    # 闭包的四个核心价值
    print("\n闭包的四个核心价值:")
    
    values = [
        ("状态封装", "将状态与操作状态的函数绑定", "创建有状态的函数"),
        ("数据隐私", "创建私有变量", "只能通过闭包访问"),
        ("函数工厂", "动态生成函数", "根据参数创建不同函数"),
        ("回调函数", "携带上下文信息", "事件处理、异步编程"),
    ]
    
    for value, mechanism, benefit in values:
        print(f"  • {value:15} | {mechanism:25} | -> {benefit}")

# 运行演示
demonstrate_closure_essence()

1.2 Python闭包的深入使用

让我们深入探索Python中闭包的各种用法。

python

# ============================================================================
# Python闭包的深入使用
# ============================================================================

print("\n=== Python闭包的深入使用 ===")

def demonstrate_python_closures():
    """演示Python闭包的高级用法"""
    
    import time
    import functools
    
    # 1. 基本闭包
    print("1. 基本闭包:")
    
    def outer_function(msg):
        """外部函数"""
        message = msg  # 自由变量
        
        def inner_function():
            """内部函数 - 闭包"""
            return f"消息: {message}"
        
        return inner_function
    
    hello_func = outer_function("你好")
    goodbye_func = outer_function("再见")
    
    print(f"hello_func: {hello_func()}")
    print(f"goodbye_func: {goodbye_func()}")
    
    # 检查闭包属性
    print(f"\n闭包属性检查:")
    print(f"hello_func.__closure__: {hello_func.__closure__}")
    print(f"hello_func.__code__.co_freevars: {hello_func.__code__.co_freevars}")
    
    if hello_func.__closure__:
        cell = hello_func.__closure__[0]
        print(f"闭包单元格内容: {cell.cell_contents}")
    
    # 2. 带参数的闭包
    print("\n2. 带参数的闭包:")
    
    def power_factory(exponent):
        """创建幂函数"""
        def power(base):
            return base ** exponent
        return power
    
    square = power_factory(2)
    cube = power_factory(3)
    
    print(f"square(5) = {square(5)}")  # 25
    print(f"cube(5) = {cube(5)}")      # 125
    
    # 3. 修改外部变量
    print("\n3. 修改外部变量:")
    
    def counter_factory():
        """计数器工厂"""
        count = 0
        
        def increment():
            nonlocal count
            count += 1
            return count
        
        def decrement():
            nonlocal count
            count -= 1
            return count
        
        def get_count():
            return count
        
        def reset():
            nonlocal count
            count = 0
        
        # 返回多个闭包函数
        return increment, decrement, get_count, reset
    
    inc, dec, get, reset = counter_factory()
    
    print(f"初始计数: {get()}")
    print(f"增加后: {inc()}")
    print(f"增加后: {inc()}")
    print(f"减少后: {dec()}")
    reset()
    print(f"重置后: {get()}")
    
    # 4. 闭包与装饰器
    print("\n4. 闭包与装饰器:")
    
    def logger(func):
        """日志装饰器"""
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            print(f"[LOG] 调用 {func.__name__},参数: {args}, {kwargs}")
            result = func(*args, **kwargs)
            print(f"[LOG] {func.__name__} 返回: {result}")
            return result
        return wrapper
    
    @logger
    def add(a, b):
        return a + b
    
    print("装饰器本质上是闭包:")
    print(f"add(3, 5) = {add(3, 5)}")
    
    # 5. 闭包与函数柯里化
    print("\n5. 闭包与函数柯里化:")
    
    def curry(func):
        """柯里化装饰器"""
        @functools.wraps(func)
        def curried(*args, **kwargs):
            if len(args) + len(kwargs) >= func.__code__.co_argcount:
                return func(*args, **kwargs)
            
            def partial(*more_args, **more_kwargs):
                new_args = args + more_args
                new_kwargs = {**kwargs, **more_kwargs}
                return curried(*new_args, **new_kwargs)
            
            return partial
        return curried
    
    @curry
    def multiply(a, b, c):
        return a * b * c
    
    print("柯里化示例:")
    print(f"multiply(2, 3, 4) = {multiply(2, 3, 4)}")
    print(f"multiply(2)(3)(4) = {multiply(2)(3)(4)}")
    print(f"multiply(2, 3)(4) = {multiply(2, 3)(4)}")
    
    double = multiply(2)  # 部分应用
    print(f"double(5, 6) = {double(5, 6)}")  # 2 * 5 * 6 = 60
    
    # 6. 闭包与回调函数
    print("\n6. 闭包与回调函数:")
    
    def event_handler_factory():
        """事件处理器工厂"""
        handlers = []
        
        def register_handler(handler):
            """注册事件处理器"""
            handlers.append(handler)
            print(f"注册处理器: {handler.__name__}")
        
        def unregister_handler(handler):
            """注销事件处理器"""
            if handler in handlers:
                handlers.remove(handler)
                print(f"注销处理器: {handler.__name__}")
        
        def trigger_event(event_data):
            """触发事件"""
            print(f"触发事件: {event_data}")
            for handler in handlers:
                handler(event_data)
        
        return register_handler, unregister_handler, trigger_event
    
    register, unregister, trigger = event_handler_factory()
    
    def email_notifier(data):
        print(f"  发送邮件通知: {data}")
    
    def sms_notifier(data):
        print(f"  发送短信通知: {data}")
    
    register(email_notifier)
    register(sms_notifier)
    
    print("触发事件:")
    trigger("用户登录")
    
    unregister(sms_notifier)
    print("\n移除短信通知后触发事件:")
    trigger("订单创建")
    
    # 7. 闭包与状态机
    print("\n7. 闭包与状态机:")
    
    def state_machine_factory(initial_state):
        """状态机工厂"""
        state = initial_state
        transitions = {}
        
        def add_transition(from_state, to_state, condition):
            """添加状态转移"""
            if from_state not in transitions:
                transitions[from_state] = []
            transitions[from_state].append((to_state, condition))
        
        def process_event(event):
            """处理事件"""
            nonlocal state
            
            if state not in transitions:
                print(f"状态 {state} 没有转移规则")
                return state
            
            for to_state, condition in transitions[state]:
                if condition(event):
                    old_state = state
                    state = to_state
                    print(f"状态转移: {old_state} -> {state} (事件: {event})")
                    return state
            
            print(f"状态 {state} 没有匹配的事件 {event}")
            return state
        
        def get_state():
            return state
        
        return add_transition, process_event, get_state
    
    # 创建状态机:简单的工作流
    add_transition, process_event, get_state = state_machine_factory("待处理")
    
    # 定义状态转移规则
    add_transition("待处理", "处理中", lambda e: e == "开始处理")
    add_transition("处理中", "已完成", lambda e: e == "完成处理")
    add_transition("处理中", "待处理", lambda e: e == "重新处理")
    add_transition("已完成", "待处理", lambda e: e == "重置")
    
    print("状态机示例:")
    events = ["开始处理", "完成处理", "重置", "开始处理", "重新处理", "开始处理", "完成处理"]
    
    for event in events:
        process_event(event)
    
    print(f"最终状态: {get_state()}")
    
    # 8. 闭包与记忆化(Memoization)
    print("\n8. 闭包与记忆化(Memoization):")
    
    def memoize(func):
        """记忆化装饰器"""
        cache = {}
        
        @functools.wraps(func)
        def memoized(*args):
            if args in cache:
                print(f"  缓存命中: {func.__name__}{args}")
                return cache[args]
            
            result = func(*args)
            cache[args] = result
            print(f"  缓存添加: {func.__name__}{args} = {result}")
            return result
        
        return memoized
    
    @memoize
    def fibonacci(n):
        """斐波那契数列"""
        if n <= 1:
            return n
        return fibonacci(n-1) + fibonacci(n-2)
    
    print("记忆化斐波那契数列:")
    print(f"fibonacci(10) = {fibonacci(10)}")
    print(f"fibonacci(10) 再次调用:")
    print(f"fibonacci(10) = {fibonacci(10)}")
    
    # 9. 闭包与延迟计算
    print("\n9. 闭包与延迟计算:")
    
    def lazy_value(computation):
        """创建延迟计算的值"""
        computed = False
        value = None
        
        def get():
            nonlocal computed, value
            if not computed:
                print("  执行计算...")
                value = computation()
                computed = True
            return value
        
        return get
    
    print("延迟计算示例:")
    lazy_sum = lazy_value(lambda: sum(range(1000000)))  # 昂贵的计算
    
    print("创建延迟计算对象(尚未计算)")
    print("第一次获取值(触发计算):")
    start = time.time()
    result1 = lazy_sum()
    elapsed = time.time() - start
    print(f"  结果: {result1}, 耗时: {elapsed:.4f}秒")
    
    print("第二次获取值(从缓存获取):")
    start = time.time()
    result2 = lazy_sum()
    elapsed = time.time() - start
    print(f"  结果: {result2}, 耗时: {elapsed:.4f}秒")
    
    # 10. 闭包与对象系统
    print("\n10. 闭包与对象系统:")
    
    def create_person(name, age):
        """使用闭包创建对象"""
        
        # 私有变量
        _name = name
        _age = age
        _friends = []
        
        # 公共接口
        def get_name():
            return _name
        
        def get_age():
            return _age
        
        def set_age(new_age):
            nonlocal _age
            if new_age >= 0:
                _age = new_age
                return True
            return False
        
        def add_friend(friend):
            if friend not in _friends:
                _friends.append(friend)
                return True
            return False
        
        def get_friends():
            return _friends.copy()  # 返回副本以保护内部状态
        
        def greet(other_person):
            other_name = other_person.get_name() if callable(getattr(other_person, 'get_name', None)) else str(other_person)
            return f"你好,{other_name}!我是{_name}。"
        
        # 返回公共方法
        return {
            'get_name': get_name,
            'get_age': get_age,
            'set_age': set_age,
            'add_friend': add_friend,
            'get_friends': get_friends,
            'greet': greet,
        }
    
    print("使用闭包实现对象:")
    
    alice = create_person("Alice", 30)
    bob = create_person("Bob", 25)
    
    print(f"Alice: {alice.get_name()}, {alice.get_age()}岁")
    print(f"Bob: {bob.get_name()}, {bob.get_age()}岁")
    
    alice.add_friend(bob)
    bob.add_friend(alice)
    
    print(f"Alice的朋友: {[f.get_name() for f in alice.get_friends()]}")
    print(f"Bob的朋友: {[f.get_name() for f in bob.get_friends()]}")
    
    print(f"Alice打招呼: {alice.greet(bob)}")
    print(f"Bob打招呼: {bob.greet(alice)}")
    
    # 尝试访问私有变量(应该失败)
    print(f"\n尝试访问私有变量:")
    try:
        print(f"  alice._name: 不可访问")
    except AttributeError as e:
        print(f"  正确捕获错误: {e}")

# 运行演示
demonstrate_python_closures()

第二部分:闭包的高级模式

2.1 闭包在异步编程中的应用

闭包在异步编程中有着广泛的应用,特别是在事件处理和回调函数中。

python

# ============================================================================
# 闭包在异步编程中的应用
# ============================================================================

print("\n=== 闭包在异步编程中的应用 ===")

def demonstrate_async_closures():
    """演示闭包在异步编程中的应用"""
    
    import asyncio
    import time
    from typing import Callable, List
    
    # 1. 异步回调
    print("1. 异步回调:")
    
    class AsyncEventEmitter:
        """异步事件发射器"""
        
        def __init__(self):
            self._listeners = {}
        
        def on(self, event: str, callback: Callable):
            """注册事件监听器"""
            if event not in self._listeners:
                self._listeners[event] = []
            self._listeners[event].append(callback)
        
        async def emit(self, event: str, *args, **kwargs):
            """触发事件"""
            if event in self._listeners:
                for callback in self._listeners[event]:
                    # 如果是异步函数则等待
                    if asyncio.iscoroutinefunction(callback):
                        await callback(*args, **kwargs)
                    else:
                        callback(*args, **kwargs)
    
    async def demo_async_callbacks():
        """演示异步回调"""
        emitter = AsyncEventEmitter()
        
        # 同步回调
        def sync_callback(data):
            print(f"  同步回调: 处理数据 {data}")
        
        # 异步回调
        async def async_callback(data):
            await asyncio.sleep(0.1)
            print(f"  异步回调: 处理数据 {data}")
        
        # 使用闭包创建回调
        def create_callback_with_context(context):
            async def callback(data):
                await asyncio.sleep(0.05)
                print(f"  带上下文的回调: [{context}] 处理数据 {data}")
            return callback
        
        # 注册回调
        emitter.on("data", sync_callback)
        emitter.on("data", async_callback)
        emitter.on("data", create_callback_with_context("用户"))
        
        print("触发事件:")
        await emitter.emit("data", "测试数据")
    
    # 运行异步演示
    asyncio.run(demo_async_callbacks())
    
    # 2. 异步任务管理
    print("\n2. 异步任务管理:")
    
    def create_task_manager():
        """创建任务管理器"""
        tasks = []
        
        async def add_task(coro, name=None):
            """添加任务"""
            task = asyncio.create_task(coro)
            tasks.append((name, task))
            print(f"  添加任务: {name or '匿名任务'}")
            return task
        
        async def cancel_all():
            """取消所有任务"""
            cancelled = 0
            for name, task in tasks:
                if not task.done():
                    task.cancel()
                    cancelled += 1
                    print(f"  取消任务: {name or '匿名任务'}")
            
            tasks.clear()
            return cancelled
        
        async def wait_all():
            """等待所有任务完成"""
            if tasks:
                print(f"  等待 {len(tasks)} 个任务完成...")
                await asyncio.gather(*[t for _, t in tasks], return_exceptions=True)
        
        def task_count():
            return len(tasks)
        
        return add_task, cancel_all, wait_all, task_count
    
    async def demo_task_manager():
        """演示任务管理器"""
        add_task, cancel_all, wait_all, task_count = create_task_manager()
        
        async def worker(name, duration):
            """工作函数"""
            print(f"    任务 {name} 开始")
            await asyncio.sleep(duration)
            print(f"    任务 {name} 完成")
            return f"任务 {name} 结果"
        
        # 添加任务
        await add_task(worker("A", 0.2), "任务A")
        await add_task(worker("B", 0.1), "任务B")
        await add_task(worker("C", 0.3), "任务C")
        
        print(f"  当前任务数: {task_count()}")
        
        # 等待任务完成
        await asyncio.sleep(0.15)
        
        # 取消剩余任务
        cancelled = await cancel_all()
        print(f"  已取消 {cancelled} 个任务")
        
        # 添加新任务
        await add_task(worker("D", 0.1), "任务D")
        await wait_all()
    
    asyncio.run(demo_task_manager())
    
    # 3. 闭包与异步生成器
    print("\n3. 闭包与异步生成器:")
    
    def create_async_data_stream(limit=10, delay=0.1):
        """创建异步数据流"""
        count = 0
        
        async def data_stream():
            nonlocal count
            while count < limit:
                await asyncio.sleep(delay)
                count += 1
                yield f"数据项 {count}"
        
        async def reset():
            nonlocal count
            count = 0
            print("  数据流已重置")
        
        def get_count():
            return count
        
        return data_stream, reset, get_count
    
    async def demo_async_stream():
        """演示异步数据流"""
        stream, reset, get_count = create_async_data_stream(limit=5, delay=0.05)
        
        print("  消费数据流:")
        async for data in stream():
            print(f"    收到: {data}")
        
        print(f"  总数据项: {get_count()}")
        
        # 重置并再次消费
        await reset()
        print("  再次消费:")
        async for data in stream():
            print(f"    收到: {data}")
    
    asyncio.run(demo_async_stream())
    
    # 4. 异步速率限制
    print("\n4. 异步速率限制:")
    
    def create_rate_limiter(max_calls, period):
        """创建速率限制器"""
        import time
        calls = []
        
        async def call_async(func, *args, **kwargs):
            """调用函数,遵守速率限制"""
            now = time.time()
            
            # 移除超过时间窗口的调用记录
            calls[:] = [t for t in calls if now - t < period]
            
            # 检查是否超过限制
            if len(calls) >= max_calls:
                # 等待直到有可用配额
                oldest = calls[0]
                wait_time = period - (now - oldest)
                if wait_time > 0:
                    print(f"  速率限制: 等待 {wait_time:.2f}秒")
                    await asyncio.sleep(wait_time)
                    now = time.time()
                    # 重新清理调用记录
                    calls[:] = [t for t in calls if now - t < period]
            
            # 记录本次调用
            calls.append(now)
            
            # 执行函数
            if asyncio.iscoroutinefunction(func):
                return await func(*args, **kwargs)
            else:
                return func(*args, **kwargs)
        
        return call_async
    
    async def demo_rate_limiter():
        """演示速率限制"""
        limiter = create_rate_limiter(max_calls=3, period=1.0)  # 每秒最多3次调用
        
        async def api_call(i):
            print(f"    执行API调用 {i}")
            await asyncio.sleep(0.1)
            return f"结果 {i}"
        
        print("  快速调用5次API(应该被限速):")
        tasks = [limiter(api_call, i) for i in range(5)]
        results = await asyncio.gather(*tasks)
        
        print(f"  所有结果: {results}")
    
    asyncio.run(demo_rate_limiter())
    
    # 5. 异步缓存
    print("\n5. 异步缓存:")
    
    def create_async_cache(ttl=60):
        """创建异步缓存"""
        import time
        cache = {}
        
        async def get_or_set(key, coro_func, *args, **kwargs):
            """获取或设置缓存值"""
            now = time.time()
            
            # 检查缓存
            if key in cache:
                value, timestamp = cache[key]
                if now - timestamp < ttl:
                    print(f"  缓存命中: {key}")
                    return value
            
            # 缓存未命中,执行函数
            print(f"  缓存未命中: {key},执行函数...")
            value = await coro_func(*args, **kwargs)
            
            # 更新缓存
            cache[key] = (value, now)
            print(f"  缓存已更新: {key}")
            
            # 清理过期缓存
            expired_keys = [k for k, (_, t) in cache.items() if now - t >= ttl]
            for k in expired_keys:
                del cache[k]
            if expired_keys:
                print(f"  清理 {len(expired_keys)} 个过期缓存")
            
            return value
        
        def clear():
            """清空缓存"""
            nonlocal cache
            count = len(cache)
            cache = {}
            print(f"  清空 {count} 个缓存项")
            return count
        
        def size():
            return len(cache)
        
        return get_or_set, clear, size
    
    async def demo_async_cache():
        """演示异步缓存"""
        get_or_set, clear, size = create_async_cache(ttl=0.5)  # 0.5秒TTL
        
        async def expensive_operation(key):
            print(f"    执行昂贵操作: {key}")
            await asyncio.sleep(0.2)
            return f"操作结果 {key}"
        
        print("  第一次调用(缓存未命中):")
        result1 = await get_or_set("key1", expensive_operation, "key1")
        print(f"    结果: {result1}")
        
        print("\n  立即再次调用(缓存命中):")
        result2 = await get_or_set("key1", expensive_operation, "key1")
        print(f"    结果: {result2}")
        
        print(f"\n  缓存大小: {size()}")
        
        print("\n  等待缓存过期...")
        await asyncio.sleep(0.6)
        
        print("  过期后调用(缓存未命中):")
        result3 = await get_or_set("key1", expensive_operation, "key1")
        print(f"    结果: {result3}")
        
        print(f"\n  清空缓存:")
        cleared = clear()
        print(f"    清理了 {cleared} 个缓存项")
        print(f"    缓存大小: {size()}")
    
    asyncio.run(demo_async_cache())

# 运行演示
demonstrate_async_closures()

2.2 闭包与函数式编程

闭包是函数式编程的核心概念,支持高阶函数、柯里化、组合等模式。

python

# ============================================================================
# 闭包与函数式编程
# ============================================================================

print("\n=== 闭包与函数式编程 ===")

def demonstrate_functional_closures():
    """演示闭包与函数式编程"""
    
    import functools
    from typing import Callable, TypeVar, Any
    
    T = TypeVar('T')
    U = TypeVar('U')
    V = TypeVar('V')
    
    # 1. 高阶函数
    print("1. 高阶函数:")
    
    def compose(*funcs: Callable) -> Callable:
        """函数组合:f(g(x))"""
        def composed(arg):
            result = arg
            for f in reversed(funcs):
                result = f(result)
            return result
        return composed
    
    def pipe(*funcs: Callable) -> Callable:
        """管道操作:x |> f |> g"""
        def piped(arg):
            result = arg
            for f in funcs:
                result = f(result)
            return result
        return piped
    
    # 示例函数
    def add_one(x: int) -> int:
        return x + 1
    
    def double(x: int) -> int:
        return x * 2
    
    def square(x: int) -> int:
        return x * x
    
    # 组合函数
    add_one_then_double = compose(double, add_one)
    double_then_add_one = compose(add_one, double)
    
    print(f"  组合函数示例:")
    print(f"  add_one_then_double(5) = {add_one_then_double(5)}")  # double(add_one(5)) = 12
    print(f"  double_then_add_one(5) = {double_then_add_one(5)}")  # add_one(double(5)) = 11
    
    # 管道操作
    pipeline = pipe(add_one, double, square)
    print(f"  管道操作: 5 |> add_one |> double |> square = {pipeline(5)}")  # 144
    
    # 2. 柯里化与部分应用
    print("\n2. 柯里化与部分应用:")
    
    def curry2(func: Callable[[T, U], V]) -> Callable[[T], Callable[[U], V]]:
        """二元函数柯里化"""
        return lambda x: lambda y: func(x, y)
    
    def uncurry2(func: Callable[[T], Callable[[U], V]]) -> Callable[[T, U], V]:
        """二元函数反柯里化"""
        return lambda x, y: func(x)(y)
    
    # 原始函数
    def add(x: int, y: int) -> int:
        return x + y
    
    def multiply(x: int, y: int) -> int:
        return x * y
    
    # 柯里化
    curried_add = curry2(add)
    curried_multiply = curry2(multiply)
    
    print(f"  柯里化示例:")
    print(f"  curried_add(3)(5) = {curried_add(3)(5)}")
    print(f"  curried_multiply(3)(5) = {curried_multiply(3)(5)}")
    
    # 部分应用
    add_five = curried_add(5)
    multiply_by_three = curried_multiply(3)
    
    print(f"  部分应用示例:")
    print(f"  add_five(10) = {add_five(10)}")
    print(f"  multiply_by_three(10) = {multiply_by_three(10)}")
    
    # 反柯里化
    uncurried_add = uncurry2(curried_add)
    print(f"  反柯里化: uncurried_add(3, 5) = {uncurried_add(3, 5)}")
    
    # 3. 闭包与函数工厂
    print("\n3. 闭包与函数工厂:")
    
    def function_factory(operation: str) -> Callable[[float, float], float]:
        """函数工厂:根据操作符创建函数"""
        operations = {
            '+': lambda x, y: x + y,
            '-': lambda x, y: x - y,
            '*': lambda x, y: x * y,
            '/': lambda x, y: x / y if y != 0 else float('nan'),
        }
        
        if operation not in operations:
            raise ValueError(f"未知操作符: {operation}")
        
        return operations[operation]
    
    # 创建函数
    add_func = function_factory('+')
    multiply_func = function_factory('*')
    
    print(f"  函数工厂示例:")
    print(f"  add_func(10, 5) = {add_func(10, 5)}")
    print(f"  multiply_func(10, 5) = {multiply_func(10, 5)}")
    
    # 更复杂的函数工厂
    def make_transformer(scale: float, offset: float) -> Callable[[float], float]:
        """创建线性变换函数"""
        def transform(x: float) -> float:
            return scale * x + offset
        return transform
    
    celsius_to_fahrenheit = make_transformer(1.8, 32)
    meters_to_feet = make_transformer(3.28084, 0)
    
    print(f"\n  变换函数工厂:")
    print(f"  20°C = {celsius_to_fahrenheit(20):.1f}°F")
    print(f"  10m = {meters_to_feet(10):.2f}ft")
    
    # 4. 闭包与函数记忆化
    print("\n4. 闭包与函数记忆化:")
    
    def memoize_with_key(func: Callable[..., T], key_func: Callable[..., Any] = None) -> Callable[..., T]:
        """带自定义键函数的记忆化"""
        cache = {}
        
        def memoized(*args, **kwargs):
            # 使用提供的键函数或默认参数元组作为键
            if key_func:
                key = key_func(*args, **kwargs)
            else:
                key = (args, tuple(sorted(kwargs.items())))
            
            if key not in cache:
                cache[key] = func(*args, **kwargs)
            return cache[key]
        
        return memoized
    
    # 示例:斐波那契数列
    @memoize_with_key
    def fibonacci(n: int) -> int:
        if n <= 1:
            return n
        return fibonacci(n-1) + fibonacci(n-2)
    
    print(f"  记忆化斐波那契:")
    print(f"  fibonacci(10) = {fibonacci(10)}")
    print(f"  fibonacci(20) = {fibonacci(20)}")
    
    # 5. 闭包与惰性序列
    print("\n5. 闭包与惰性序列:")
    
    def lazy_range(start: int, end: int = None, step: int = 1):
        """创建惰性范围"""
        if end is None:
            start, end = 0, start
        
        current = start
        
        def next_value():
            nonlocal current
            if (step > 0 and current >= end) or (step < 0 and current <= end):
                raise StopIteration
            
            value = current
            current += step
            return value
        
        def has_next():
            if step > 0:
                return current < end
            else:
                return current > end
        
        def reset():
            nonlocal current
            current = start
        
        def take(n: int):
            """获取前n个值"""
            values = []
            for _ in range(n):
                if has_next():
                    values.append(next_value())
                else:
                    break
            return values
        
        return {
            'next': next_value,
            'has_next': has_next,
            'reset': reset,
            'take': take,
        }
    
    print("  惰性序列示例:")
    seq = lazy_range(1, 10)
    
    print(f"  前3个值: {seq['take'](3)}")
    print(f"  再取2个值: {[seq['next']() for _ in range(2)]}")
    print(f"  是否还有值: {seq['has_next']()}")
    
    seq['reset']()
    print(f"  重置后前5个值: {seq['take'](5)}")
    
    # 6. 闭包与状态管理(函数式状态)
    print("\n6. 闭包与状态管理:")
    
    def create_stateful_component(initial_state):
        """创建有状态的组件(函数式风格)"""
        state = initial_state
        
        def get_state():
            return state
        
        def update_state(updater):
            nonlocal state
            state = updater(state)
            return state
        
        def map_state(mapper):
            nonlocal state
            state = mapper(state)
            return state
        
        def with_state(worker):
            """在当前状态下执行工作"""
            return worker(state)
        
        return {
            'get_state': get_state,
            'update_state': update_state,
            'map_state': map_state,
            'with_state': with_state,
        }
    
    print("  函数式状态管理:")
    counter = create_stateful_component(0)
    
    print(f"  初始状态: {counter['get_state']()}")
    
    # 更新状态
    counter['update_state'](lambda s: s + 1)
    print(f"  加1后: {counter['get_state']()}")
    
    counter['map_state'](lambda s: s * 2)
    print(f"  乘2后: {counter['get_state']()}")
    
    # 使用状态但不修改
    result = counter['with_state'](lambda s: f"当前值: {s}")
    print(f"  读取状态: {result}")
    
    # 7. 闭包与单子(Monad)模式
    print("\n7. 闭包与单子模式:")
    
    class Maybe:
        """Maybe单子(类似Option)"""
        
        @staticmethod
        def unit(value):
            """unit/return操作"""
            return Just(value)
        
        @staticmethod
        def bind(mvalue, func):
            """bind操作(>>=)"""
            if isinstance(mvalue, Just):
                return func(mvalue.value)
            else:  # Nothing
                return mvalue
        
        @staticmethod
        def fmap(func, mvalue):
            """fmap操作(f <$> m)"""
            if isinstance(mvalue, Just):
                return Just(func(mvalue.value))
            else:
                return mvalue
    
    class Just(Maybe):
        """有值"""
        def __init__(self, value):
            self.value = value
        
        def __str__(self):
            return f"Just({self.value})"
    
    class Nothing(Maybe):
        """无值"""
        def __str__(self):
            return "Nothing"
    
    # 使用Maybe处理可能失败的计算
    def safe_divide(x, y):
        """安全除法"""
        if y == 0:
            return Nothing()
        return Just(x / y)
    
    def safe_sqrt(x):
        """安全平方根"""
        if x < 0:
            return Nothing()
        return Just(x ** 0.5)
    
    # 链式操作
    result1 = Maybe.bind(
        safe_divide(16, 4),
        lambda x: safe_sqrt(x)
    )
    
    result2 = Maybe.bind(
        safe_divide(16, 0),  # 除以0
        lambda x: safe_sqrt(x)
    )
    
    result3 = Maybe.bind(
        safe_divide(-16, 4),  # 结果为负
        lambda x: safe_sqrt(x)
    )
    
    print("  Maybe单子示例:")
    print(f"  16 / 4 |> sqrt = {result1}")
    print(f"  16 / 0 |> sqrt = {result2}")
    print(f"  -16 / 4 |> sqrt = {result3}")
    
    # 使用闭包简化
    def maybe_computation(*steps):
        """组合多个可能失败的计算"""
        def run(initial_value):
            result = Maybe.unit(initial_value)
            for step in steps:
                result = Maybe.bind(result, step)
            return result
        return run
    
    # 创建计算管道
    compute = maybe_computation(
        lambda x: safe_divide(x, 2),
        lambda x: safe_sqrt(x),
        lambda x: Just(x * 10)  # 总是成功
    )
    
    print(f"\n  计算管道示例:")
    print(f"  计算 100: {compute(100)}")
    print(f"  计算 -100: {compute(-100)}")

# 运行演示
demonstrate_functional_closures()

第三部分:闭包的设计哲学与最佳实践

3.1 闭包的设计哲学

闭包不仅仅是技术特性,它还代表了一种编程哲学。

python

# ============================================================================
# 闭包的设计哲学
# ============================================================================

print("\n=== 闭包的设计哲学 ===")

def demonstrate_closure_philosophy():
    """演示闭包的设计哲学"""
    
    print("闭包背后的软件设计哲学:")
    
    # 1. 最小权限原则
    print("\n1. 最小权限原则:")
    print("   闭包只暴露必要的接口,隐藏实现细节")
    
    def create_bank_account(initial_balance=0):
        """创建银行账户"""
        balance = initial_balance
        transaction_history = []
        
        def deposit(amount):
            nonlocal balance
            if amount <= 0:
                return False
            balance += amount
            transaction_history.append(('deposit', amount, balance))
            return True
        
        def withdraw(amount):
            nonlocal balance
            if amount <= 0 or amount > balance:
                return False
            balance -= amount
            transaction_history.append(('withdraw', amount, balance))
            return True
        
        def get_balance():
            return balance
        
        def get_transaction_count():
            return len(transaction_history)
        
        # 不暴露transaction_history,只提供统计信息
        return {
            'deposit': deposit,
            'withdraw': withdraw,
            'get_balance': get_balance,
            'get_transaction_count': get_transaction_count,
        }
    
    account = create_bank_account(100)
    print(f"   初始余额: ${account['get_balance']()}")
    account['deposit'](50)
    account['withdraw'](30)
    print(f"   操作后余额: ${account['get_balance']()}")
    print(f"   交易次数: {account['get_transaction_count']()}")
    print(f"   无法直接访问交易历史(最小权限)")
    
    # 2. 组合优于继承
    print("\n2. 组合优于继承:")
    print("   使用闭包组合功能,而不是通过继承")
    
    def create_loggable(func, logger_func):
        """创建可记录日志的函数"""
        def logged(*args, **kwargs):
            logger_func(f"调用 {func.__name__},参数: {args}, {kwargs}")
            result = func(*args, **kwargs)
            logger_func(f"{func.__name__} 返回: {result}")
            return result
        return logged
    
    def create_timed(func):
        """创建计时函数"""
        import time
        def timed(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            end = time.time()
            print(f"  {func.__name__} 执行时间: {end - start:.6f}秒")
            return result
        return timed
    
    def create_retryable(func, max_retries=3):
        """创建可重试的函数"""
        def retryable(*args, **kwargs):
            last_exception = None
            for attempt in range(max_retries):
                try:
                    if attempt > 0:
                        print(f"  第 {attempt} 次重试...")
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    print(f"  第 {attempt+1} 次尝试失败: {e}")
            
            print(f"  所有 {max_retries} 次尝试都失败")
            raise last_exception
        return retryable
    
    # 原始函数
    def unstable_operation():
        import random
        if random.random() < 0.5:
            raise ValueError("随机失败")
        return "成功"
    
    # 组合功能
    logged_operation = create_loggable(unstable_operation, print)
    timed_operation = create_timed(logged_operation)
    robust_operation = create_retryable(timed_operation, max_retries=5)
    
    print("   组合功能示例:")
    try:
        result = robust_operation()
        print(f"   结果: {result}")
    except Exception as e:
        print(f"   最终失败: {e}")
    
    # 3. 函数作为一等公民
    print("\n3. 函数作为一等公民:")
    print("   函数可以像其他值一样传递、返回和存储")
    
    def create_calculator():
        """创建计算器"""
        operations = {}
        
        def register(name, func):
            operations[name] = func
            print(f"  注册操作: {name}")
        
        def execute(name, *args):
            if name in operations:
                return operations[name](*args)
            else:
                raise ValueError(f"未知操作: {name}")
        
        def list_operations():
            return list(operations.keys())
        
        return register, execute, list_operations
    
    register, execute, list_ops = create_calculator()
    
    # 注册操作
    register('add', lambda x, y: x + y)
    register('multiply', lambda x, y: x * y)
    register('power', lambda x, y: x ** y)
    
    print(f"   可用操作: {list_ops()}")
    print(f"   执行 add(3, 5): {execute('add', 3, 5)}")
    print(f"   执行 multiply(3, 5): {execute('multiply', 3, 5)}")
    
    # 4. 关注点分离
    print("\n4. 关注点分离:")
    print("   使用闭包分离数据、逻辑和副作用")
    
    def create_data_pipeline(*processors):
        """创建数据处理管道"""
        def process(data):
            result = data
            for processor in processors:
                result = processor(result)
            return result
        return process
    
    # 各种处理器
    def validate_data(data):
        if not isinstance(data, dict):
            raise ValueError("数据必须是字典")
        return data
    
    def extract_fields(data):
        return {k: v for k, v in data.items() if k in ['name', 'age', 'email']}
    
    def transform_data(data):
        data['age'] = int(data.get('age', 0))
        data['name'] = data.get('name', '').title()
        return data
    
    def log_data(data):
        print(f"  处理数据: {data}")
        return data
    
    # 创建管道
    data_pipeline = create_data_pipeline(
        validate_data,
        extract_fields,
        transform_data,
        log_data
    )
    
    raw_data = {'name': 'alice smith', 'age': '30', 'email': 'alice@example.com', 'extra': 'ignore'}
    print("   数据处理管道:")
    processed_data = data_pipeline(raw_data)
    print(f"   处理结果: {processed_data}")
    
    # 5. 不可变性与纯函数
    print("\n5. 不可变性与纯函数:")
    print("   闭包可以捕获不可变状态,创建纯函数")
    
    def create_pure_function(base_value):
        """创建纯函数(给定相同输入,总是返回相同输出)"""
        def pure_function(x):
            # 只使用参数和捕获的不可变状态
            return x + base_value
        return pure_function
    
    add_ten = create_pure_function(10)
    add_twenty = create_pure_function(20)
    
    print("   纯函数示例:")
    print(f"   add_ten(5) = {add_ten(5)} (总是15)")
    print(f"   add_twenty(5) = {add_twenty(5)} (总是25)")
    print(f"   没有副作用,容易测试和推理")
    
    # 6. 延迟计算与惰性求值
    print("\n6. 延迟计算与惰性求值:")
    
    def create_lazy_evaluator(computation):
        """创建延迟计算器"""
        computed = False
        value = None
        
        def get():
            nonlocal computed, value
            if not computed:
                print("    执行计算...")
                value = computation()
                computed = True
            return value
        
        def is_computed():
            return computed
        
        return get, is_computed
    
    print("   延迟计算示例:")
    
    # 昂贵的计算
    expensive_computation = lambda: sum(i**2 for i in range(1000000))
    
    lazy_value, is_computed = create_lazy_evaluator(expensive_computation)
    
    print(f"   是否已计算: {is_computed()}")
    print("   第一次获取值:")
    result1 = lazy_value()
    print(f"   结果: {result1}, 是否已计算: {is_computed()}")
    
    print("   第二次获取值(应该立即返回):")
    result2 = lazy_value()
    print(f"   结果: {result2}")
    
    # 7. 闭包与领域特定语言(DSL)
    print("\n7. 闭包与领域特定语言:")
    
    def create_query_dsl():
        """创建查询DSL"""
        conditions = []
        
        def where(field, op, value):
            conditions.append((field, op, value))
            return query
        
        def execute(data):
            result = []
            for item in data:
                match = True
                for field, op, expected in conditions:
                    actual = item.get(field)
                    
                    if op == '==':
                        if actual != expected:
                            match = False
                            break
                    elif op == '!=':
                        if actual == expected:
                            match = False
                            break
                    elif op == '>':
                        if not (actual > expected):
                            match = False
                            break
                    elif op == '<':
                        if not (actual < expected):
                            match = False
                            break
                    elif op == 'contains':
                        if expected not in str(actual):
                            match = False
                            break
                
                if match:
                    result.append(item)
            
            return result
        
        def reset():
            nonlocal conditions
            conditions = []
            return query
        
        query = {
            'where': where,
            'execute': execute,
            'reset': reset,
        }
        return query
    
    print("   查询DSL示例:")
    
    # 测试数据
    users = [
        {'name': 'Alice', 'age': 30, 'role': 'admin'},
        {'name': 'Bob', 'age': 25, 'role': 'user'},
        {'name': 'Charlie', 'age': 35, 'role': 'admin'},
        {'name': 'Diana', 'age': 28, 'role': 'user'},
    ]
    
    query = create_query_dsl()
    
    # 构建查询
    results = (query['where']('role', '==', 'admin')
                   ['where']('age', '>', 28)
                   ['execute'](users))
    
    print(f"   查询结果: {[r['name'] for r in results]}")
    
    # 重置并构建新查询
    query['reset']()
    results2 = (query['where']('name', 'contains', 'a')
                      ['execute'](users))
    
    print(f"   新查询结果: {[r['name'] for r in results2]}")

# 运行演示
demonstrate_closure_philosophy()

# 闭包的最佳实践
print("\n" + "="*60)
print("闭包的最佳实践")
print("="*60)

print("""
原则1:明确闭包的用途
  • 用于封装状态和数据隐私
  • 用于创建函数工厂
  • 用于实现回调函数和事件处理
  • 避免过度使用导致代码难以理解

原则2:注意变量的生命周期
  • 理解哪些变量被闭包捕获
  • 注意循环引用可能导致的内存泄漏
  • 在不需要时释放闭包引用

原则3:使用适当的工具
  • 简单的状态封装:使用闭包
  • 复杂的状态管理:考虑使用类
  • 性能敏感场景:注意闭包的开销

原则4:保持闭包简单
  • 每个闭包应该只做一件事
  • 避免在闭包中实现复杂逻辑
  • 闭包应该是可测试的

原则5:正确处理可变状态
  • 尽量使用不可变状态
  • 如果必须使用可变状态,确保正确同步
  • 使用nonlocal或可变对象来修改外部变量

原则6:注意性能影响
  • 闭包的创建和调用有一定开销
  • 避免在性能敏感的循环中创建闭包
  • 考虑缓存闭包创建的结果

原则7:提供清晰的接口
  • 闭包应该提供清晰的接口
  • 隐藏实现细节
  • 提供适当的错误处理

原则8:测试闭包
  • 单独测试闭包的行为
  • 测试闭包与不同上下文的交互
  • 测试闭包的内存管理

闭包的常见陷阱:
1. 循环引用:闭包引用外部变量,外部变量又引用闭包
2. 意外共享:多个闭包共享同一个变量
3. 内存泄漏:闭包保持对不再需要对象的引用
4. 过度封装:将简单的功能过度复杂化

何时使用闭包:
1. 需要创建有状态的函数时
2. 需要实现回调函数时
3. 需要创建函数工厂时
4. 需要封装私有数据时
5. 需要实现装饰器时

何时不使用闭包:
1. 需要复杂的状态管理时(考虑使用类)
2. 需要继承和多态时
3. 性能是关键考虑因素时
4. 代码需要被多种语言复用时

未来趋势:
1. 更好的闭包优化:编译器对闭包的优化越来越好
2. 更丰富的闭包语法:更多语言支持简洁的闭包语法
3. 更好的工具支持:调试器和分析器对闭包的支持更好
4. 函数式编程的普及:闭包作为函数式编程的核心概念更受重视

结语:闭包是强大的抽象工具
闭包让我们能够创建更灵活、更模块化的代码。
理解闭包不仅有助于编写更好的代码,还有助于理解函数式编程、
异步编程和现代软件设计的核心思想。
""")

总结

5.1 闭包的核心洞见

通过这堂课,我们获得了以下核心洞见:

闭包是函数与环境的结合:闭包不仅包含函数代码,还包含其创建时的词法环境。

闭包实现状态封装:闭包可以捕获和封装状态,创建有状态的函数。

闭包支持高阶抽象:闭包是函数作为一等公民的直接体现,支持高阶函数和函数组合。

5.2 闭包的优势

数据封装和隐私:闭包可以创建私有变量,只通过接口访问。

代码复用和组合:闭包可以作为函数工厂,动态生成函数。

异步编程支持:闭包在回调函数和事件处理中非常有用。

函数式编程基础:闭包支持柯里化、部分应用、函数组合等模式。

5.3 实用建议

何时使用闭包:

  • 当需要创建有状态的函数时
  • 当需要封装私有数据时
  • 当需要实现回调函数时
  • 当需要创建函数工厂时

何时不使用闭包:

  • 当需要复杂的状态管理时(考虑使用类)
  • 当需要继承和多态时
  • 当性能是关键考虑因素时
  • 当代码需要被多种语言复用时

闭包设计原则:

  • 单一职责:每个闭包只做一件事
  • 明确接口:提供清晰的输入输出
  • 注意生命周期:理解变量的捕获和释放
  • 保持简单:避免过度复杂的闭包

5.4 语言选择建议

JavaScript:闭包无处不在,是语言的核心特性。

Python:闭包功能完整,使用nonlocal处理变量。

Rust:闭包是trait对象,有Fn、FnMut、FnOnce三种类型。

C++:Lambda表达式,显式捕获列表,需要注意生命周期。

Go:匿名函数,闭包简单直接。

5.5 终极目标:编写声明式的代码

闭包让我们能够以声明式的方式思考问题:

函数即数据:函数可以像数据一样传递和操作。

状态即环境:状态可以通过环境捕获,而不是显式传递。

组合即抽象:通过组合简单函数来构建复杂功能。

通过本课的学习,你应该掌握了闭包的核心概念,并能够在实际项目中应用这些强大的特性,创建更加灵活、模块化、可维护的代码。

第六十课:闭包 – 函数与其词法环境!完。

第61课:设计原则 —— 从闭包到可维护的代码架构

一、为什么在“闭包”之后讨论“设计原则”?

在掌握了闭包这样强大的技术工具后,我们面临一个新的问题:“如何使用这些工具,构建出清晰、健壮、易于理解和维护的代码?” 这就像学会了各种木材的连接方式(榫卯、胶合、钉接)后,需要学习建筑学的原理,才能造出坚固、实用、美观的房子,而非一堆杂乱的结构。

闭包为实现某些经典的设计模式(如模块模式、策略模式)提供了底层支持。然而,若没有设计原则的指导,对闭包的滥用反而会导致代码难以跟踪和调试(如过度嵌套、作用域泄漏)。因此,本章旨在将你的编程视角,从“语法技巧”和“特性实现”提升到“结构设计”的层面。

二、什么是设计原则?

设计原则是软件设计领域提炼出的高级指导思想最佳实践共识。它们不关心具体的语法或API,而是关注:

  • 如何组织代码单元(类、函数、模块)?
  • 代码单元之间应如何交互?
  • 变化来临时,如何将影响局部化?

核心目标:创建高内聚、低耦合的系统。

  • 高内聚:一个模块(类、函数)内部的元素彼此紧密相关,共同完成一个明确的任务。
  • 低耦合:模块之间的依赖关系尽可能简单、明确,修改一个模块时,应尽量减少对其他模块的影响。

三、几个核心的设计原则与注释讲解

1. DRY (Don’t Repeat Yourself) – 不要重复你自己

  • 是什么:知识或逻辑在系统中应有单一、明确、权威的表示。
  • 不是什么:盲目地合并所有相似的代码。有时,两段代码看似相同,但代表不同的“知识”(未来可能独立变化),合并反而会增加耦合。
  • 注释与示例// 坏味道:重复的“知识” function calculateCircleArea(radius) { return 3.14159 * radius * radius; } function calculateCircleCircumference(radius) { return 2 * 3.14159 * radius; } // π的值和计算公式是重复的“知识”。 // 重构:提取单一权威表示 const PI = 3.14159; // 单一权威表示 function calculateCircleArea(radius) { return PI * radius * radius; } function calculateCircleCircumference(radius) { return 2 * PI * radius; } // 修改PI的值只需在一处进行。 注释:DRY原则不仅针对代码文本,更针对“业务规则”和“核心知识”。在上例中,PI的值就是一项关键知识。违反DRY不仅增加维护成本,还极易导致修改不同步,引发Bug。

2. KISS (Keep It Simple, Stupid) – 保持简单直白

  • 是什么:设计应尽可能简单,避免不必要的复杂性。
  • 实践:用最直接的方式解决问题。优先使用简单的语言特性、清晰的数据结构。不要为了“炫技”而使用晦涩难懂的代码。
  • 注释与示例
    javascript // 过于“聪明”且脆弱 function isEven(num) { return !!(num & 1) ? false : true; } // 简单直白,一目了然 function isEven(num) { return num % 2 === 0; }
    > 注释:简单的代码意味着更低的认知负荷、更少的错误和更高的可维护性。在团队协作中,这一点至关重要。通常,简单的解决方案来自于对问题的深刻理解,而非技巧的堆砌。

3. YAGNI (You Aren’t Gonna Need It) – 你不会需要它

  • 是什么:不要为未来可能的需求添加当前不需要的功能。
  • 为什么:预设计通常基于猜测,往往错误。多余的代码会增加复杂性、测试负担,并可能引入当前未考虑到的Bug。
  • 注释:这与“提前设计架构”并不矛盾。架构设计应对接当前明确的需求,并为已识别的变化点留出扩展空间(通过遵循其他原则,如开闭原则)。YAGNI反对的是为“万一以后……”这种模糊猜想而编写具体实现代码。

4. 关注点分离 (Separation of Concerns, SoC)

  • 是什么:将程序分解为不同的部分,每个部分解决一个独立的“关注点”(如数据存取、业务逻辑、用户界面)。
  • 例子:MVC (Model-View-Controller)、MVVM架构就是SoC的经典体现。
  • 注释与示例// 混杂交织的代码(关注点未分离) function handleUserSubmit() { const input = document.getElementById('userInput').value; // 1. 验证输入(业务逻辑) if (!input || input.length < 3) { alert('输入无效!'); // 2. 用户交互(UI) return; } // 3. 发送数据(数据通信) fetch('/api/save', { method: 'POST', body: JSON.stringify({ data: input }), }).then(() => { // 4. 更新界面(UI) document.getElementById('result').innerHTML = '保存成功!'; }); } // 重构后(关注点分离) // 业务逻辑层 function validateInput(input) { return input && input.length >= 3; } function createPayload(input) { return { data: input }; } // 数据通信层 async function saveData(payload) { /* fetch... */ } // UI/控制层 async function handleUserSubmit() { const input = document.getElementById('userInput').value; if (!validateInput(input)) { // 调用业务逻辑 showError('输入无效!'); // 调用UI函数 return; } const payload = createPayload(input); // 调用业务逻辑 await saveData(payload); // 调用数据层 showSuccess('保存成功!'); // 调用UI函数 } function showError(msg) { /* 更新UI显示错误 */ } function showSuccess(msg) { /* 更新UI显示成功 */ } 注释:SoC是降低复杂度的利器。分离后,每个模块职责单一,易于理解、测试和复用。当需要修改UI样式时,你无需担心会破坏业务验证逻辑。

四、向SOLID原则的延伸(高阶指引)

在面向对象编程中,有五个更具体、相互关联的设计原则,首字母缩写为SOLID。它们是指引我们实现“高内聚、低耦合”的强力工具集:

  • S – 单一职责原则:一个类只应有一个引起它变化的原因。
  • O – 开闭原则:软件实体应对扩展开放,对修改关闭。
  • L – 里氏替换原则:子类必须能够替换它们的父类而不影响程序的正确性。
  • I – 接口隔离原则:客户端不应被强迫依赖它不使用的方法。
  • D – 依赖倒置原则:高层模块不应依赖低层模块,两者都应依赖抽象。

注释:SOLID原则是设计原则的深化和具体化。虽然它们根植于OOP,但其思想(尤其是单一职责、开闭、依赖倒置)对函数式编程和模块化设计同样具有深刻的启发意义。理解它们是通往高级软件架构师的必经之路。

总结

第61课的设计原则,为你提供了在语法和特性之上的元认知框架。它们回答的不是“这个功能如何实现”,而是“这个功能应该放在哪里,以及如何与系统其他部分优雅地协作”。

当你再回头审视第60课的闭包时,你会发现,一个设计良好的模块(利用闭包实现私有变量),正是遵循了单一职责关注点分离原则。而一个接收函数作为参数的高阶函数(闭包的常见应用),则便于我们未来遵循开闭原则进行扩展。

掌握设计原则,能让你在编程时做出更明智的决策,写出不仅能让机器正确执行,更能让同行(以及未来的你)轻松理解的代码。这是从“程序员”走向“工程师”的关键一步。


延伸思考:请回顾你之前写过的项目代码,找出一处违反了DRY或KISS原则的地方,并思考如何重构。再思考一下,闭包在你的项目中,是帮助还是阻碍了“关注点分离”?

第62课:设计模式 —— 基于设计原则的可复用解决方案

一、为什么在“设计原则”之后讨论“设计模式”?

掌握了设计原则后,你拥有了评判代码设计的“标尺”,知道什么是不好的(违反DRY、KISS等),什么是好的(高内聚、低耦合)。但面对具体问题时,可能仍会困惑:“如何具体落地这些原则?

设计模式正是在此背景下诞生的答案。它们是经过验证的、针对特定问题的可复用解决方案模板。正如建筑大师克里斯托弗·亚历山大所说:“每个模式都描述了一个在我们环境中反复出现的问题,以及该问题解决方案的核心。”

关键理解:设计模式并非要你死记硬背的代码模板,而是解决问题的思路和经验的总结。它们是设计原则的具体应用和体现

二、设计模式分类概览

根据模式的目的和用途,经典的GoF设计模式可分为三类:

  1. 创建型模式:关注对象的创建机制,使对象创建更灵活、更符合系统需求。
  2. 结构型模式:关注如何组合类和对象以形成更大的结构,提升系统的灵活性和可复用性。
  3. 行为型模式:关注对象之间的职责分配和通信方式,使系统更动态、更易于协作。

这三类模式共同构成了面向对象设计的基石,它们相互关联、相互补充,许多复杂系统会同时应用多种模式。

三、创建型模式:将对象的创建与使用分离

核心思想:封装对象的创建过程,使系统不依赖于对象创建的具体细节。

1. 工厂方法模式(Factory Method)

  • 问题:当一个类无法预知需要创建哪种具体对象,或希望子类决定创建什么对象时。
  • 解决方案:定义一个创建对象的接口,但让子类决定实例化哪个类。
  • 设计原则体现
    • 依赖倒置原则:高层模块(客户端代码)不依赖具体产品类,而是依赖抽象。
    • 开闭原则:添加新产品时,只需新增工厂子类和产品子类,无需修改现有工厂和客户端代码。
  • 代码示例与注释// 抽象产品接口 class Button { render() { throw new Error('子类必须实现render方法'); } } // 具体产品 class WindowsButton extends Button { render() { console.log('渲染Windows风格按钮'); } } class MacOSButton extends Button { render() { console.log('渲染MacOS风格按钮'); } } // 抽象创建者(工厂) class Dialog { // 工厂方法(抽象或默认实现) createButton() { throw new Error('子类必须实现createButton方法'); }render() { // 使用工厂方法创建产品,但不关心具体类型 const button = this.createButton(); button.render(); }} // 具体创建者 class WindowsDialog extends Dialog { createButton() { return new WindowsButton(); } } class MacOSDialog extends Dialog { createButton() { return new MacOSButton(); } } // 客户端代码:根据配置或环境决定使用哪个具体工厂 const config = { os: 'mac' }; let dialog; if (config.os === 'windows') { dialog = new WindowsDialog(); } else { dialog = new MacOSDialog(); } dialog.render(); // 输出:渲染MacOS风格按钮 注释:工厂方法将对象创建延迟到子类,使代码更灵活。客户端只与抽象接口(Dialog、Button)交互,实现了依赖倒置。当需要新增Linux按钮时,只需添加LinuxButton和LinuxDialog,无需修改现有代码,符合开闭原则。

2. 单例模式(Singleton)

  • 问题:确保一个类只有一个实例,并提供一个全局访问点(如配置管理、数据库连接池)。
  • 解决方案:私有化构造函数,通过静态方法控制实例的创建和访问。
  • 设计原则体现
    • 单一职责原则:单例类只负责管理自己的唯一实例。
    • 关注点分离:将全局状态的管理集中在一个地方。
  • 代码示例与注释class ConfigManager { static #instance = null; // 私有静态字段,存储唯一实例 #config = {}; // 私有字段,存储配置数据// 私有构造函数,防止外部new constructor() { if (ConfigManager.#instance) { throw new Error('请使用getInstance方法获取实例'); } ConfigManager.#instance = this; } // 全局访问点 static getInstance() { if (!ConfigManager.#instance) { ConfigManager.#instance = new ConfigManager(); } return ConfigManager.#instance; } set(key, value) { this.#config[key] = value; } get(key) { return this.#config[key]; }} // 使用示例 const config1 = ConfigManager.getInstance(); config1.set('theme', 'dark'); const config2 = ConfigManager.getInstance(); console.log(config2.get('theme')); // 输出:dark console.log(config1 === config2); // 输出:true,确实是同一个实例 // const config3 = new ConfigManager(); // 报错:请使用getInstance方法 注释:单例模式需谨慎使用,因为它引入了全局状态,可能增加模块间的隐式耦合,不利于测试。在JavaScript中,模块系统(ES6 Module)本身就提供了单例特性,因此可能不需要额外的单例类。

四、结构型模式:组合对象以形成更大的结构

核心思想:通过组合(而非继承)来构建灵活、可复用的结构。

1. 适配器模式(Adapter)

  • 问题:两个已有接口不兼容,但又需要一起工作。
  • 解决方案:创建一个适配器类,包装不兼容的对象,提供统一的接口。
  • 设计原则体现
    • 单一职责原则:适配器只负责接口转换。
    • 开闭原则:客户端可以通过适配器使用新接口,而无需修改原有代码。
  • 代码示例与注释
    javascript // 旧系统:圆形钉,有特定的半径 class RoundPeg { constructor(radius) { this.radius = radius; } getRadius() { return this.radius; } } // 新系统:方孔,只能接受方形钉 class SquareHole { constructor(width) { this.width = width; } fits(peg) { // 期望peg有getWidth方法,但RoundPeg没有 return peg.getWidth ? peg.getWidth() <= this.width : false; } } // 适配器:让圆形钉能适配方孔 class SquarePegAdapter { constructor(roundPeg) { this.roundPeg = roundPeg; } // 提供方孔期望的getWidth方法 getWidth() { // 将圆形钉的直径作为方形钉的宽度 return this.roundPeg.getRadius() * 2; } } // 使用 const hole = new SquareHole(10); const roundPeg = new RoundPeg(5); // 半径5,直径10 // hole.fits(roundPeg); // 错误:roundPeg没有getWidth方法 const adapter = new SquarePegAdapter(roundPeg); console.log(hole.fits(adapter)); // 输出:true,适配成功
    > 注释:适配器模式常用于整合遗留代码、第三方库或不同API。它像一个“转接头”,让不兼容的接口能够协作,而不是修改已有代码,这符合开闭原则。

2. 装饰器模式(Decorator)

  • 问题:需要动态地为对象添加新功能,而不想通过继承增加子类。
  • 解决方案:创建一个装饰器类,包装原始对象,提供相同的接口并添加额外行为。
  • 设计原则体现
    • 开闭原则:可以动态添加功能,无需修改原有对象。
    • 单一职责原则:每个装饰器类只负责一个具体的功能增强。
  • 代码示例与注释// 基础组件接口 class Coffee { cost() { return 5; } description() { return '普通咖啡'; } } // 装饰器基类(继承自相同接口) class CoffeeDecorator { constructor(coffee) { this.coffee = coffee; } cost() { return this.coffee.cost(); } description() { return this.coffee.description(); } } // 具体装饰器:加牛奶 class MilkDecorator extends CoffeeDecorator { cost() { return this.coffee.cost() + 2; } description() { return this.coffee.description() + ',加牛奶'; } } // 具体装饰器:加糖 class SugarDecorator extends CoffeeDecorator { cost() { return this.coffee.cost() + 1; } description() { return this.coffee.description() + ',加糖'; } } // 使用:动态组合功能 let myCoffee = new Coffee(); console.log(myCoffee.description(), '价格:', myCoffee.cost()); // 普通咖啡 价格:5 myCoffee = new MilkDecorator(myCoffee); console.log(myCoffee.description(), '价格:', myCoffee.cost()); // 普通咖啡,加牛奶 价格:7 myCoffee = new SugarDecorator(myCoffee); console.log(myCoffee.description(), '价格:', myCoffee.cost()); // 普通咖啡,加牛奶,加糖 价格:8 // 可以任意组合,且顺序可调 let anotherCoffee = new SugarDecorator(new MilkDecorator(new Coffee())); console.log(anotherCoffee.description()); // 普通咖啡,加牛奶,加糖 注释:装饰器模式提供了比继承更灵活的功能扩展方式。每个装饰器都是一个独立的、可复用的组件。JavaScript中的高阶函数(函数装饰器)和ES7的装饰器语法(@decorator)都是这一思想的体现。这完美符合开闭原则——对扩展开放(添加新装饰器),对修改关闭(不修改原组件)。

五、行为型模式:优化对象间的通信与职责分配

核心思想:关注对象如何协作完成复杂任务,以及如何分配职责。

1. 观察者模式(Observer)

  • 问题:当一个对象(主题)的状态变化需要通知多个其他对象(观察者),且观察者数量和类型可能变化。
  • 解决方案:定义一种订阅机制,允许观察者订阅主题,当主题状态变化时自动通知所有观察者。
  • 设计原则体现
    • 开闭原则:可以轻松添加新的观察者,无需修改主题。
    • 松耦合:主题和观察者之间只有抽象的依赖关系。
  • 代码示例与注释
    javascript // 主题(被观察者) class WeatherStation { constructor() { this.temperature = 0; this.observers = []; // 观察者列表 } // 订阅方法 subscribe(observer) { this.observers.push(observer); } // 取消订阅 unsubscribe(observer) { this.observers = this.observers.filter(obs => obs !== observer); } // 通知所有观察者 notify() { this.observers.forEach(observer => observer.update(this.temperature)); } // 业务方法:温度变化时自动通知 setTemperature(temp) { this.temperature = temp; this.notify(); } } // 观察者接口 class Display { update(temperature) { throw new Error('子类必须实现update方法'); } } // 具体观察者 class PhoneDisplay extends Display { constructor(name) { super(); this.name = name; } update(temperature) { console.log(`[${this.name}] 当前温度:${temperature}°C`); } } // 使用 const station = new WeatherStation(); const phone1 = new PhoneDisplay('张三的手机'); const phone2 = new PhoneDisplay('李四的手机'); station.subscribe(phone1); station.subscribe(phone2); station.setTemperature(25); // 输出:[张三的手机] 当前温度:25°C // [李四的手机] 当前温度:25°C station.unsubscribe(phone1); station.setTemperature(30); // 只输出:[李四的手机] 当前温度:30°C
    > 注释:观察者模式是事件驱动系统的基石,实现了对象间的松耦合通信。主题不知道也不关心观察者的具体实现,只负责通知。这与第60课的闭包有联系:观察者的回调函数常常形成闭包,捕获了定义时的上下文。这种模式在前端框架(如React的状态管理、Vue的响应式系统)和后端的事件系统中广泛应用。

2. 策略模式(Strategy)

  • 问题:一个任务有多种算法实现,需要在运行时动态选择。
  • 解决方案:将每种算法封装成独立的策略类,并使它们可以互相替换。
  • 设计原则体现
    • 开闭原则:添加新策略时无需修改上下文。
    • 单一职责原则:每种策略只负责一种算法。
    • 依赖倒置原则:上下文依赖抽象的策略接口。
  • 代码示例与注释
    javascript // 策略接口 class PaymentStrategy { pay(amount) { throw new Error('子类必须实现pay方法'); } } // 具体策略 class CreditCardStrategy extends PaymentStrategy { constructor(cardNumber, cvv) { super(); this.cardNumber = cardNumber; this.cvv = cvv; } pay(amount) { console.log(`使用信用卡支付 ${amount} 元,卡号:${this.cardNumber}`); // 实际会调用银行API } } class AlipayStrategy extends PaymentStrategy { constructor(email) { super(); this.email = email; } pay(amount) { console.log(`使用支付宝支付 ${amount} 元,账户:${this.email}`); // 实际会调用支付宝API } } // 上下文(使用策略的类) class ShoppingCart { constructor() { this.items = []; this.paymentStrategy = null; } addItem(item, price) { this.items.push({ item, price }); } setPaymentStrategy(strategy) { this.paymentStrategy = strategy; } checkout() { const total = this.items.reduce((sum, item) => sum + item.price, 0); if (!this.paymentStrategy) { throw new Error('请先设置支付方式'); } this.paymentStrategy.pay(total); this.items = []; // 清空购物车 } } // 使用 const cart = new ShoppingCart(); cart.addItem('书籍', 50); cart.addItem('鼠标', 100); // 运行时动态选择策略 if (userPrefers === 'creditCard') { cart.setPaymentStrategy(new CreditCardStrategy('1234-5678', '123')); } else { cart.setPaymentStrategy(new AlipayStrategy('user@example.com')); } cart.checkout(); // 根据选择调用相应支付策略
    > 注释:策略模式将算法与使用它的客户端解耦,使得算法可以独立变化。这与闭包有有趣的联系:在JavaScript中,策略常常可以用简单的高阶函数(闭包)来实现,而非完整的类。例如,排序函数Array.prototype.sort接受的比较函数就是一种策略模式的轻量级实现。

六、模式间的联系与组合使用

在实际系统中,设计模式往往不是孤立存在的。例如:

  • MVC架构:结合了观察者(Model与View)、策略(Controller行为)和组合(View层次)等多种模式。
  • 一个电商系统:可能使用工厂方法创建订单、装饰器添加折扣、策略处理支付、观察者通知物流。

重要提醒

  1. 不要过度设计:模式为解决特定问题而生,如果问题不存在,强行应用模式只会增加复杂度(违反KISS原则)。
  2. 模式是指导思想,不是教条:可以根据具体语言特性简化实现(如JavaScript中常用函数和对象替代完整的类层次)。
  3. 理解比记忆更重要:掌握每个模式的意图适用场景优缺点,比死记UML图更有价值。

七、总结与展望

从设计原则到设计模式,你已建立起从宏观指导思想到微观解决方案的完整知识链:

  1. 设计原则(第61课)提供了评价设计的“价值观”和“质量标准”。
  2. 设计模式(本课)提供了实现这些原则的“工具箱”和“最佳实践”。

这三类模式共同解决了面向对象设计中的核心挑战:

  • 创建型模式让你更灵活地创建对象(解决“对象从哪里来”)
  • 结构型模式让你更优雅地组合对象(解决“对象如何组织”)
  • 行为型模式让你更智能地协调对象(解决“对象如何互动”)

进阶方向

  • 学习更多模式:如原型模式、代理模式、责任链模式、访问者模式等。
  • 研究架构模式:如MVC、MVVM、微服务、事件驱动架构等更高层次的模式。
  • 理解反模式:识别常见的不良设计模式,避免重蹈覆辙。

记住,设计模式的最终目标是写出可读、可维护、可扩展的代码。当你面临设计抉择时,问自己:这个方案是否遵循了设计原则?是否有合适的模式可以借鉴?

设计模式的学习是一个持续的过程,随着经验的积累,你会逐渐形成自己的“模式嗅觉”——能够自然地在合适的场景应用合适的模式,这才是真正的掌握。

第63课:编程范式与思维方式 —— 函数式编程与响应式编程

一、为什么在”设计模式”之后讨论”编程范式”?

经过前两课的学习,你掌握了如何应用设计原则来指导代码结构(第61课),以及如何使用具体的设计模式解决常见问题(第62课)。现在,让我们把视野提升到一个更高的层次:编程范式与思维方式

如果说设计模式是解决特定问题的”工具包”,那么编程范式就是指导我们如何思考问题、组织代码的”世界观”和”方法论”。不同的编程范式代表了不同的思维模型,它们从根本上影响我们如何设计软件。

关键洞察:编程范式不仅仅是语法特性,更是解决问题的不同思维方式。掌握多种范式能让你在面对问题时拥有更多视角,选择最适合的工具。

二、什么是编程范式?

编程范式是一套关于如何构建和理解程序的基本理念和原则。它定义了:

  • 如何看待数据和操作:是作为可变状态还是不可变值?
  • 如何组织计算:是作为一系列状态转换还是表达式求值?
  • 如何组合功能:是通过对象组合还是函数组合?

主要编程范式概览

  1. 命令式编程:描述”如何做”(How),通过一系列指令改变程序状态。
  • 过程式编程:基于过程/函数的指令序列(如C、Pascal)。
  • 面向对象编程:基于对象和类的指令序列(如Java、C++、Python)。
  1. 声明式编程:描述”做什么”(What),而非”如何做”。
  • 函数式编程:基于数学函数和表达式求值。
  • 逻辑编程:基于逻辑规则和推理(如Prolog)。
  • 响应式编程:基于数据流和变化传播。

重要关系:命令式编程关注控制流,声明式编程关注数据流。这种根本差异导致了完全不同的设计和思考方式。

三、函数式编程(Functional Programming,FP)

1. 核心思想:数学函数作为一等公民

函数式编程将计算视为数学函数的求值,避免状态改变和可变数据。它的核心原则来自于λ演算(Lambda Calculus)。

2. 函数式编程的核心概念

a. 纯函数(Pure Functions)

  • 是什么:给定相同输入,总是返回相同输出,且没有副作用。
  • 为什么重要:纯函数易于推理、测试、并行化,且具有引用透明性。
  • 示例与注释// 不纯的函数:有副作用,输出依赖外部状态 let taxRate = 0.1; function calculateTaxImpure(price) { console.log(`计算税款...`); // 副作用1:控制台输出 return price * taxRate; // 副作用2:依赖外部变量 } // 纯函数:无副作用,只依赖输入参数 function calculateTaxPure(price, rate) { return price * rate; } // 使用 console.log(calculateTaxPure(100, 0.1)); // 总是返回10 console.log(calculateTaxPure(100, 0.1)); // 总是返回10,可预测! 注释:纯函数是函数式编程的基石。它们像数学函数一样工作:f(x) = x²。无论何时调用f(5),结果总是25。这种可预测性使代码更可靠、更易于维护。

b. 不可变性(Immutability)

  • 是什么:数据一旦创建就不能被修改。任何”修改”操作都会创建新的数据。
  • 为什么重要:消除由共享可变状态引起的Bug,简化并发编程。
  • 示例与注释// 可变的方式(命令式思维) const mutableArray = [1, 2, 3]; mutableArray.push(4); // 直接修改原数组 console.log(mutableArray); // [1, 2, 3, 4] // 不可变的方式(函数式思维) const immutableArray = [1, 2, 3]; const newArray = [...immutableArray, 4]; // 创建新数组 console.log(immutableArray); // [1, 2, 3] - 原数组未改变 console.log(newArray); // [1, 2, 3, 4] - 新数组 // 对象也是如此 const person = { name: 'Alice', age: 30 }; const updatedPerson = { ...person, age: 31 }; // 创建新对象 注释:在JavaScript中,不可变性需要开发者自觉维护(使用展开运算符...Object.assign、数组的map/filter等方法)。在纯函数式语言(如Haskell、Clojure)中,不可变性是语言级别的强制约束。不可变数据天然线程安全,且易于实现时间旅行调试。

c. 高阶函数与函数组合

  • 是什么:函数可以作为参数传递、作为返回值返回,并且可以组合成新函数。
  • 为什么重要:实现抽象和代码复用,构建复杂行为。
  • 示例与注释// 高阶函数:接受函数作为参数或返回函数 function multiplyBy(factor) { // 返回一个新函数(闭包!) return function(number) { return number * factor; }; } const double = multiplyBy(2); const triple = multiplyBy(3); console.log(double(5)); // 10 console.log(triple(5)); // 15 // 函数组合:将多个函数组合成一个新函数 const add = (a, b) => a + b; const square = x => x * x; // 手动组合 const addThenSquare = (a, b) => square(add(a, b)); console.log(addThenSquare(2, 3)); // 25 // 通用组合函数 const compose = (f, g) => (x) => f(g(x)); const addThenSquareComposed = compose(square, add.bind(null, 2, 3)); console.log(addThenSquareComposed()); // 25 注释:高阶函数和函数组合是函数式编程的表达力源泉。它们允许我们创建抽象层,将简单函数组合成复杂行为。这与数学中的函数复合f(g(x))概念一致。在函数式语言中,通常有更优雅的组合方式(如Elixir的管道操作符|>)。

d. 声明式数据处理

  • 是什么:使用mapfilterreduce等操作描述数据转换,而非显式循环。
  • 为什么重要:更简洁、更表达意图、更易于并行化。
  • 示例与注释// 命令式方式:描述"如何做" const numbers = [1, 2, 3, 4, 5]; let doubledEvens = []; for (let i = 0; i < numbers.length; i++) { if (numbers[i] % 2 === 0) { doubledEvens.push(numbers[i] * 2); } } // 函数式方式:描述"做什么" const doubledEvensFP = numbers .filter(n => n % 2 === 0) // 过滤偶数 .map(n => n * 2); // 每个乘以2 console.log(doubledEvensFP); // [4, 8] // 使用reduce计算总和 const sum = numbers.reduce((acc, curr) => acc + curr, 0); console.log(sum); // 15 注释map/filter/reduce是函数式编程的三驾马车。它们将循环抽象为高阶操作,让开发者专注于转换逻辑而非迭代细节。这种风格更接近自然语言:”过滤出偶数,然后将它们加倍”。更重要的是,这些操作可以轻松并行执行(如使用Web Workers),因为它们不共享可变状态。

3. 函数式编程的优势与挑战

优势

  1. 更可预测的代码:纯函数和不可变性减少意外行为。
  2. 更易于测试和调试:没有副作用,单元测试更简单。
  3. 更好的并发支持:不可变数据天然线程安全。
  4. 更强的表达能力:高阶函数和组合允许创建强大的抽象。

挑战

  1. 学习曲线:需要思维方式的转变,尤其是对习惯了命令式编程的开发者。
  2. 性能考量:不可变性可能带来内存开销(需要通过结构共享等技术优化)。
  3. 与现实世界的交互:I/O、DOM操作本质上有副作用,需要特殊处理(如Monad)。

四、响应式编程(Reactive Programming,RP)

1. 核心思想:数据流和变化传播

响应式编程是关于数据流和变化传播的编程范式。它关注随着时间变化的数据序列(流),以及这些数据流之间的依赖关系。

关键洞察:响应式编程将一切都视为流:用户输入、网络响应、定时事件、变量值的变化等。通过声明式地组合和转换这些流,构建响应式系统。

2. 响应式编程的核心概念

a. 观察者模式与数据流

  • 是什么:响应式编程建立在观察者模式之上,但将其推广到连续的数据流。
  • 与函数式编程的关系:响应式编程常与函数式编程结合(称为函数响应式编程,FRP),使用纯函数转换数据流。
  • 示例与注释// 传统观察者模式(离散事件) class Observable { constructor() { this.subscribers = []; } subscribe(fn) { this.subscribers.push(fn); } next(value) { this.subscribers.forEach(fn => fn(value)); } } const clicks = new Observable(); clicks.subscribe(x => console.log(`点击了: ${x}`)); // 模拟点击事件 clicks.next('按钮A'); clicks.next('按钮B'); // 响应式思维:将事件视为流 // 假设我们有鼠标移动事件流 // mouseMoveStream: ---(x1,y1)---(x2,y2)---(x3,y3)---> // 我们可以转换这个流 // distancesStream: ---d1---d2---d3---> 注释:响应式编程扩展了观察者模式,不仅处理离散事件,还处理连续的数据序列(流)。流可以表示任何随时间变化的值:鼠标位置、股票价格、聊天消息等。这种统一的抽象非常强大。

b. 响应式扩展(RxJS)示例

  • 是什么:RxJS是JavaScript的响应式编程库,提供了丰富的操作符来处理异步数据流。
  • 为什么重要:统一了各种异步编程模式(回调、Promise、事件),提供声明式的流操作。
  • 示例与注释// 使用RxJS(需要先安装rxjs库) import { fromEvent, interval, combineLatest } from 'rxjs'; import { map, filter, throttleTime, take } from 'rxjs/operators'; // 创建DOM事件流 const button = document.getElementById('myButton'); const clickStream = fromEvent(button, 'click'); // 转换流:节流、计数、过滤 const processedStream = clickStream.pipe( throttleTime(1000), // 1秒内只允许一次点击 map((event, index) => `点击次数: ${index + 1}`), // 转换为消息 filter((message, index) => index < 5) // 只取前5次 ); // 订阅流 processedStream.subscribe({ next: message => console.log(message), complete: () => console.log('已完成5次点击') }); // 组合多个流 const timer1 = interval(1000).pipe(take(5)); // 每秒触发,共5次 const timer2 = interval(2000).pipe(take(3)); // 每2秒触发,共3次 const combined = combineLatest([timer1, timer2]).pipe( map(([val1, val2]) => `定时器1: ${val1}, 定时器2: ${val2}`) ); combined.subscribe(console.log); // 输出示例: // "定时器1: 0, 定时器2: 0" (2秒后) // "定时器1: 2, 定时器2: 0" (第3秒) // "定时器1: 2, 定时器2: 1" (第4秒) // ... 注释:RxJS的操作符(如mapfilterthrottleTime)类似于数组方法,但它们处理的是随时间推移到达的值流。这使得处理复杂的异步交互变得声明式和可组合。每个操作符返回一个新的可观察对象(流),符合不可变原则。

c. 响应式UI与状态管理

  • 是什么:UI作为数据流的函数,状态变化自动触发UI更新。
  • 为什么重要:简化UI编程,自动保持UI与状态同步。
  • 示例与注释// 简化的响应式状态管理示例 class ReactiveState { constructor(initialState) { this.state = initialState; this.subscribers = []; }// 更新状态(不可变方式) setState(updater) { const prevState = this.state; this.state = updater(prevState); // 通知所有订阅者 this.subscribers.forEach(sub =&gt; sub(this.state, prevState)); } // 订阅状态变化 subscribe(callback) { this.subscribers.push(callback); return () =&gt; { this.subscribers = this.subscribers.filter(sub =&gt; sub !== callback); }; }} // 创建响应式状态 const appState = new ReactiveState({ count: 0, user: null }); // UI组件订阅状态 const unsubscribe = appState.subscribe((newState, oldState) => { console.log('状态变化:', oldState, '->', newState); // 这里可以触发UI更新 document.getElementById('counter').textContent = newState.count; }); // 更新状态,UI自动响应 appState.setState(state => ({ ...state, count: state.count + 1 })); appState.setState(state => ({ ...state, user: { name: 'Alice' } })); // 现代前端框架(如React、Vue、Svelte)都内置了响应式状态管理 注释:现代前端框架的核心就是响应式UI。Vue 3的Composition API、React的Hooks + Context、Svelte的响应式声明,都是响应式编程思想的具体实现。它们让UI成为状态的声明式映射,当状态变化时,UI自动更新。

3. 响应式编程的优势与挑战

优势

  1. 统一的异步处理:用一致的模型处理事件、AJAX、定时器等。
  2. 声明式数据流组合:复杂异步逻辑变得可读和可维护。
  3. 自动依赖管理:数据流之间的依赖关系自动维护。
  4. 更好的错误处理:流操作符提供了统一的错误处理机制。

挑战

  1. 学习曲线:响应式思维和操作符需要时间掌握。
  2. 调试难度:异步数据流可能难以跟踪和调试。
  3. 内存泄漏风险:需要正确管理订阅,避免未取消订阅导致的内存泄漏。
  4. 概念抽象:理解”热流”、”冷流”、”背压”等高级概念需要实践。

五、范式融合与现代化应用

1. 多范式语言与混合编程

现代编程语言大多支持多范式,允许开发者根据问题选择最合适的思维方式:

// JavaScript中的多范式示例
// 1. 命令式/过程式
for (let i = 0; i < 10; i++) { console.log(i); }

// 2. 面向对象
class User {
    constructor(name) { this.name = name; }
    greet() { return `Hello, ${this.name}`; }
}

// 3. 函数式
const users = ['Alice', 'Bob', 'Charlie'];
const greetings = users.map(name => `Hello, ${name}`);

// 4. 响应式(使用RxJS)
import { from } from 'rxjs';
const userStream = from(users);
userStream.subscribe(console.log);

2. 现代框架中的范式融合

  • React + Redux:函数式(纯组件、不可变状态) + 响应式(状态变化触发UI更新)。
  • Vue 3:响应式(Composition API) + 函数式(计算属性、watchEffect)。
  • Angular:面向对象(类、装饰器) + 响应式(RxJS集成)。

3. 如何选择编程范式?

考虑以下因素:

  1. 问题领域:数据转换密集型适合函数式,事件驱动系统适合响应式。
  2. 团队经验:选择团队熟悉和能够维护的范式。
  3. 性能要求:实时系统可能更适合命令式优化。
  4. 可维护性:长期项目可能受益于函数式的可预测性。

实用建议:从单一范式开始深入学习,然后逐渐融合。许多实际项目会混合使用多种范式:

  • 使用函数式处理数据转换
  • 使用面向对象组织业务逻辑
  • 使用响应式处理异步和事件

六、总结与思考

从设计模式到编程范式,你的软件开发视角正在从微观战术提升到宏观战略

  1. 设计模式(第62课):解决特定问题的具体方案模板
  2. 编程范式(本课):指导软件设计的整体思维框架

范式思维的关键收获

  1. 函数式编程教导我们:
  • 通过纯函数和不可变性构建可预测的系统
  • 通过高阶函数和组合构建抽象
  • 通过声明式操作聚焦于”做什么”而非”如何做”
  1. 响应式编程教导我们:
  • 将一切视为随时间变化的数据流
  • 通过声明式组合处理复杂的异步交互
  • 构建自动响应变化的系统

未来的学习方向

  1. 深入学习特定范式
  • 函数式:学习Haskell、Clojure或Elixir以获得纯函数式体验
  • 响应式:深入掌握RxJS或学习ReactiveX其他语言实现
  1. 探索架构范式
  • 领域驱动设计(DDD)
  • 事件溯源(Event Sourcing)
  • CQRS(命令查询职责分离)
  1. 实践混合范式开发
  • 在现代框架中应用函数响应式编程
  • 构建基于流的微服务架构

最终思考:编程范式不是非此即彼的选择,而是解决问题的不同视角。真正优秀的开发者不是”面向对象程序员”或”函数式程序员”,而是能够根据问题选择合适的工具,融合多种范式优点的工程师

当你在设计下一个系统时,问自己:

  • 哪些部分可以从不可变数据和纯函数中受益?
  • 哪些异步交互可以用数据流来优雅表达?
  • 如何组合这些范式来构建更健壮、更易维护的系统?

掌握多种编程范式,就像掌握了多种语言:每种语言都能表达某些思想更优雅、更精确。这种多范式思维能力,将使你成为更全面、更有创造力的软件工程师。

文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇