【Python】Python语言进阶

第一章:深入理解函数与装饰器

函数是Python中的一等公民,这意味着它们可以像其他数据类型(如整数、字符串)一样被传递、存储在变量中、作为参数传递给其他函数、从函数中返回,甚至存储在数据结构中。这种特性是Python中许多强大模式(如装饰器)的基础。

1.1 函数作为一等公民 (First-Class Functions)

让我们通过代码示例来理解函数作为一等公民的含义。

# 定义一个简单的函数
def greet(name):
    # 这是一个简单的问候函数
    return f"Hello, {
              name}"

# 将函数赋值给一个变量
my_function = greet
# 现在my_function指向了greet函数

# 通过变量调用函数
print(my_function("Alice"))
# 输出:Hello, Alice
# 这表明my_function变量现在可以像原始的greet函数一样被调用

# 定义一个接受函数作为参数的函数
def call_function(func, *args, **kwargs):
    # 这个函数接受一个函数对象和其他参数
    # 然后调用传入的函数并返回结果
    print(f"Calling function: {
              func.__name__}")
    # 打印正在调用的函数名称,__name__是函数对象的一个属性
    return func(*args, **kwargs)
    # 使用传入的参数调用传入的函数

# 将greet函数作为参数传递给call_function
result = call_function(greet, "Bob")
# 调用call_function,将greet函数对象和参数"Bob"传递进去
print(result)
# 输出:
# Calling function: greet
# Hello, Bob
# 这表明函数可以作为参数传递

# 定义一个函数,它返回另一个函数
def create_adder(x):
    # 这个函数接受一个参数x
    def adder(y):
        # 这是一个内部函数adder,它接受一个参数y
        # 它可以访问外部函数的变量x(闭包特性)
        return x + y
        # 返回x和y的和
    return adder
    # 返回内部函数adder对象本身

# 调用create_adder,创建一个加5的函数
add_five = create_adder(5)
# add_five现在是一个函数对象,这个函数内部记住了x=5

# 调用返回的函数
print(add_five(10))
# 输出:15
# 调用add_five(10)实际上是调用内部的adder函数,x是5,y是10,所以返回15
# 这表明函数可以作为另一个函数的返回值

# 将函数存储在列表中
function_list = [greet, my_function]
# 创建一个列表,包含函数对象greet和my_function

# 遍历列表并调用其中的函数
for func in function_list:
    # 遍历列表中的每一个函数对象
    print(func("Charlie"))
    # 调用当前函数对象并传入参数"Charlie"

# 输出:
# Hello, Charlie
# Hello, Charlie
# 这表明函数可以存储在数据结构中

这个例子展示了函数在Python中是如何被当作普通数据来处理的,这是理解装饰器和很多高级模式的关键。

1.2 闭包 (Closures)

闭包是函数作为一等公民的一个重要应用。当一个内部函数引用了其外部函数作用域中的变量,即使外部函数已经执行完毕,内部函数仍然可以访问和记住这些变量,这就是闭包。

def outer_function(x):
    # 这是外部函数,接受参数x
    def inner_function(y):
        # 这是内部函数,接受参数y
        # inner_function 引用了 outer_function 的变量 x
        return x + y
        # 返回外部变量x和内部变量y的和
    # outer_function 返回 inner_function 对象
    return inner_function

# 创建一个闭包,add_10 记住了 x=10
add_10 = outer_function(10)
# 调用 outer_function(10) 返回了 inner_function 对象,并且这个对象内部的 x 变量被“捕获”为 10

# 调用闭包函数
print(add_10(5))
# 输出:15
# 调用 add_10(5) 实际上是调用了那个记住了 x=10 的 inner_function,y=5,所以返回 10+5=15

print(add_10(20))
# 输出:30
# 再次调用 add_10,x 仍然是 10,y 现在是 20,所以返回 10+20=30

# 创建另一个闭包,add_5 记住了 x=5
add_5 = outer_function(5)
# 调用 outer_function(5) 返回了另一个 inner_function 对象,这次内部的 x 变量被捕获为 5

print(add_5(10))
# 输出:15
# 调用 add_5(10) 实际上是调用了那个记住了 x=5 的 inner_function,y=10,所以返回 5+10=15

# 检查闭包对象的属性
print(add_10.__closure__)
# 输出:(<cell at 0x...: int object at 0x...>,)
# __closure__ 属性包含了闭包引用的外部变量,这些变量存储在 cell 对象中

print(add_10.__closure__[0].cell_contents)
# 输出:10
# 访问 cell 对象的内容,可以看到捕获的外部变量 x 的值是 10

print(add_5.__closure__[0].cell_contents)
# 输出:5
# 访问另一个闭包的 cell 对象内容,捕获的外部变量 x 的值是 5

闭包在实际应用中非常有用,例如在事件处理、回调函数、工厂函数以及我们接下来要讨论的装饰器中。

1.3 装饰器 (Decorators)

装饰器是Python中一个非常强大和常用的特性,它允许你在不修改原函数代码的情况下,为其增加新的功能。本质上,装饰器就是一个接受一个函数作为参数,并返回一个新函数的函数(通常是利用闭包)。

Python提供了 @ 语法糖来简化装饰器的使用。

1.3.1 简单的函数装饰器

一个简单的装饰器函数通常长这样:

# 定义一个简单的装饰器函数
def my_decorator(func):
    # my_decorator 接受一个函数 func 作为参数
    def wrapper():
        # wrapper 是内部函数,这是装饰器返回的新函数
        print("Something is happening before the function is called.")
        # 在调用原函数之前执行一些操作
        func()
        # 调用原始函数 func
        print("Something is happening after the function is called.")
        # 在调用原函数之后执行一些操作
    # 装饰器返回 wrapper 函数对象
    return wrapper

# 使用装饰器语法糖
@my_decorator
# 相当于 say_hello = my_decorator(say_hello)
def say_hello():
    # 这是我们想要被装饰的原始函数
    print("Hello!")

# 调用被装饰后的函数
say_hello()
# 输出:
# Something is happening before the function is called.
# Hello!
# Something is happening after the function is called.
# 调用 say_hello() 实际上执行的是 my_decorator 返回的 wrapper 函数

上面的例子展示了最基本的装饰器功能。@my_decorator 放在 say_hello 函数定义之前,等同于执行 say_hello = my_decorator(say_hello)my_decorator 函数接收 say_hello 函数对象,内部定义了一个 wrapper 函数,并在 wrapper 中调用了原始的 say_hello 函数,并在其前后添加了额外的逻辑。最后,my_decorator 返回了这个 wrapper 函数,所以现在调用 say_hello() 实际上就是调用 wrapper()

1.3.2 带有参数的函数装饰器

如果被装饰的函数带有参数,我们的装饰器也需要能够正确地处理这些参数。这时,wrapper 函数就需要使用 *args**kwargs 来接收任意位置参数和关键字参数,并将它们传递给原始函数。

# 定义一个装饰器,用于测量函数执行时间
import time
# 导入 time 模块,用于计时

def timing_decorator(func):
    # timing_decorator 接受一个函数 func 作为参数
    def wrapper(*args, **kwargs):
        # wrapper 函数现在接受任意数量的位置参数 (*args)
        # 和任意数量的关键字参数 (**kwargs)
        start_time = time.time()
        # 记录函数开始执行的时间戳
        result = func(*args, **kwargs)
        # 调用原始函数 func,并将接收到的参数传递给它
        end_time = time.time()
        # 记录函数执行结束的时间戳
        print(f"Function {
              func.__name__} took {
              end_time - start_time:.4f} seconds to execute.")
        # 打印函数名称、执行时间
        # func.__name__ 获取原始函数的名称
        # :.4f 格式化浮点数,保留4位小数
        return result
        # 返回原始函数的执行结果
    # 装饰器返回 wrapper 函数
    return wrapper

# 使用计时装饰器
@timing_decorator
def complex_calculation(n):
    # 这是一个需要耗费一些时间的复杂计算函数
    total = 0
    # 初始化总和为0
    for i in range(n):
        # 循环 n 次
        total += i * i
        # 计算 i 的平方并加到总和
        time.sleep(0.0001) # Simulate some work
        # 模拟一些耗时操作,暂停一小段时间
    return total
    # 返回计算的总和

# 调用被装饰的函数,传入参数
result = complex_calculation(1000)
# 调用 complex_calculation,实际执行的是 timing_decorator 返回的 wrapper 函数
# wrapper 会接收参数 1000,并将其传递给原始的 complex_calculation 函数

print(f"Calculation result: {
              result}")
# 输出:
# Function complex_calculation took 0.1xxx seconds to execute.
# Calculation result: 332833500
# 输出表明装饰器成功测量了 complex_calculation 函数的执行时间,并输出了结果

在这个例子中,wrapper 函数中的 *args**kwargs 捕获了调用 complex_calculation(1000) 时传递的所有参数(这里是位置参数 1000),然后将这些参数原样传递给 func(*args, **kwargs),即原始的 complex_calculation(1000)。同时,wrapper 返回了 func 的结果,确保被装饰的函数能够正常返回其计算结果。

1.3.3 带有参数的装饰器本身

有时候,我们需要装饰器本身也能接收参数。这可以通过在装饰器函数外面再包裹一层函数来实现。最外层函数接收装饰器的参数,然后返回实际的装饰器函数。

# 定义一个带有参数的装饰器工厂函数
def repeat(num_times):
    # repeat 是最外层函数,接收装饰器的参数 num_times
    def decorator_repeat(func):
        # decorator_repeat 是实际的装饰器函数,接收被装饰的函数 func
        def wrapper_repeat(*args, **kwargs):
            # wrapper_repeat 是内部函数,是被装饰后返回的新函数
            for _ in range(num_times):
                # 根据 num_times 参数,重复执行原始函数指定的次数
                result = func(*args, **kwargs)
                # 调用原始函数,并传递参数
            return result
            # 返回最后一次执行原始函数的结果
        # decorator_repeat 返回 wrapper_repeat 函数
        return wrapper_repeat
    # repeat 返回 decorator_repeat 函数
    return decorator_repeat

# 使用带有参数的装饰器
@repeat(num_times=3)
# 相当于 greet = repeat(num_times=3)(greet)
# 先调用 repeat(num_times=3) 返回 decorator_repeat 函数
# 然后再调用 decorator_repeat(greet) 返回 wrapper_repeat 函数
def greet(name):
    # 这是我们想要被重复执行的函数
    print(f"Hello, {
              name}")

# 调用被装饰的函数
greet("World")
# 输出:
# Hello, World
# Hello, World
# Hello, World
# 调用 greet("World") 实际上执行的是 wrapper_repeat 函数,它会调用原始的 greet 函数 3 次

这里的 repeat 函数是一个“装饰器工厂”,它接收 num_times 参数,并返回一个真正的装饰器函数 decorator_repeatdecorator_repeat 接收被装饰的函数 func,然后返回一个 wrapper_repeat 函数,这个 wrapper_repeat 函数才是最终取代原始函数的新函数。@repeat(num_times=3) 的语法实际上是先调用 repeat(num_times=3) 得到 decorator_repeat 函数,然后再用 @decorator_repeat 的方式来装饰 greet 函数。

1.3.4 保留原函数元信息 (functools.wraps)

使用装饰器会改变被装饰函数的 __name____doc__ 等属性,这在调试和introspection时可能会造成困扰。为了解决这个问题,可以使用 functools.wraps 装饰器来装饰内部的 wrapper 函数。

import functools
# 导入 functools 模块

def my_decorator(func):
    # my_decorator 接受一个函数 func 作为参数
    @functools.wraps(func)
    # 使用 functools.wraps(func) 装饰 wrapper 函数
    # 这会将原始函数 func 的 __name__, __doc__ 等属性复制到 wrapper 函数上
    def wrapper(*args, **kwargs):
        # wrapper 是内部函数
        print("Something is happening before the function is called.")
        # 在调用原函数之前执行一些操作
        result = func(*args, **kwargs)
        # 调用原始函数 func,并传递参数
        print("Something is happening after the function is called.")
        # 在调用原函数之后执行一些操作
        return result
        # 返回原始函数的执行结果
    # 装饰器返回 wrapper 函数
    return wrapper

@my_decorator
def say_hello(name):
    """This function says hello to the given name."""
    # 这是带有文档字符串的原始函数
    print(f"Hello, {
              name}")

# 调用被装饰后的函数
say_hello("Alice")
# 输出:
# Something is happening before the function is called.
# Hello, Alice
# Something is happening after the function is called.

# 检查被装饰函数的元信息
print(say_hello.__name__)
# 输出:say_hello
# 使用 functools.wraps 后,__name__ 属性被正确保留

print(say_hello.__doc__)
# 输出:This function says hello to the given name.
# 使用 functools.wraps 后,__doc__ 属性被正确保留

# 如果没有使用 functools.wraps,__name__ 会是 'wrapper',__doc__ 会是 None

@functools.wraps(func) 装饰器会将原始函数 func 的一些重要属性(如 __name__, __doc__, __module__, __annotations__, __dict__)复制到 wrapper 函数上。这使得被装饰后的函数看起来更像原始函数,提高了代码的可读性和可维护性。在编写任何需要保留原函数元信息的装饰器时,都应该使用 functools.wraps

1.3.5 装饰器的应用场景

装饰器在Python中有广泛的应用,常见的场景包括:

日志记录 (Logging): 记录函数的调用、参数、返回值和执行时间。
性能分析 (Profiling): 测量函数的执行时间或内存使用。
访问控制/权限检查 (Access Control/Permission Checks): 在调用函数前检查用户是否具有执行权限。
缓存 (Caching): 缓存函数的结果,避免重复计算。
输入验证 (Input Validation): 在函数执行前验证输入参数的有效性。
重试机制 (Retry Logic): 当函数执行失败时,自动重试指定的次数。
注册函数 (Registering Functions): 例如,在web框架中注册URL路由对应的视图函数。
事务管理 (Transaction Management): 在数据库操作函数周围包裹事务逻辑。

企业级代码示例:带参数的日志记录装饰器

import logging
import functools

# 配置日志输出
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 定义一个带有日志级别的日志装饰器工厂
def log_execution(level):
    # log_execution 是装饰器工厂,接收日志级别参数 level

    def decorator_log(func):
        # decorator_log 是实际的装饰器,接收被装饰函数 func

        @functools.wraps(func)
        # 使用 functools.wraps 保留原始函数信息
        def wrapper_log(*args, **kwargs):
            # wrapper_log 是最终的包装函数

            # 构建调用信息字符串,包括函数名、参数
            args_repr = [repr(a) for a in args]
            # 将位置参数转换为它们的字符串表示
            kwargs_repr = [f"{
              k}={
              repr(v)}" for k for k, v in kwargs.items()]
            # 将关键字参数转换为 'key=value' 格式的字符串表示
            signature = ", ".join(args_repr + kwargs_repr)
            # 将位置参数和关键字参数的字符串表示合并,用逗号和空格分隔

            log_message_before = f"Calling {
              func.__name__}({
              signature})"
            # 构建函数调用前的日志消息

            # 根据传入的日志级别记录函数调用前的日志
            if level == 'info':
                logging.info(log_message_before)
            elif level == 'warning':
                logging.warning(log_message_before)
            elif level == 'error':
                logging.error(log_message_before)
            # ... 可以扩展其他日志级别

            try:
                # 尝试执行原始函数
                result = func(*args, **kwargs)
                # 调用原始函数,并传递参数

                log_message_after = f"{
              func.__name__} returned {
              repr(result)}"
                # 构建函数调用后的日志消息,包括返回值

                # 根据传入的日志级别记录函数调用后的日志
                if level == 'info':
                    logging.info(log_message_after)
                elif level == 'warning':
                    logging.warning(log_message_after)
                elif level == 'error':
                    logging.error(log_message_after)
                # ...

                return result
                # 返回原始函数的执行结果

            except Exception as e:
                # 捕获函数执行过程中可能发生的任何异常
                log_message_exception = f"{
              func.__name__} raised exception: {
              e}"
                # 构建异常日志消息

                # 记录异常日志,通常异常日志级别为 error
                logging.error(log_message_exception, exc_info=True)
                # exc_info=True 会在日志中包含异常的 traceback 信息,对于调试非常重要

                raise e
                # 重新抛出异常,确保调用者能够感知到异常
                # 否则异常会被装饰器“吞掉”

        # decorator_log 返回包装函数 wrapper_log
        return wrapper_log

    # log_execution 返回装饰器函数 decorator_log
    return decorator_log

# 使用日志装饰器,指定日志级别为 'info'
@log_execution(level='info')
def add(a, b):
    # 这是一个简单的加法函数
    return a + b

# 使用日志装饰器,指定日志级别为 'warning'
@log_execution(level='warning')
def divide(a, b):
    # 这是一个简单的除法函数,可能会发生 ZeroDivisionError
    return a / b

# 调用被装饰的函数
print(f"Sum: {
              add(5, 3)}")
# 调用 add 函数,日志装饰器会记录调用前、后的信息,级别为 info

try:
    print(f"Division: {
              divide(10, 0)}")
    # 调用 divide 函数,日志装饰器会记录调用前信息,级别为 warning
    # 当发生 ZeroDivisionError 时,日志装饰器会捕获异常并记录 error 级别日志(包含 traceback),然后重新抛出异常
except ZeroDivisionError:
    print("Caught ZeroDivisionError as expected.")
    # 捕获并处理由 divide 函数抛出的 ZeroDivisionError

这个企业级日志装饰器示例展示了如何创建一个带有参数(日志级别)的装饰器,并在其中实现详细的日志记录逻辑,包括函数调用前、后以及发生异常时的日志,并使用了 functools.wraps*args, **kwargs 来确保通用性和正确性。它捕获并记录异常,同时确保异常被重新抛出,这在企业应用中非常重要。

1.4 函数注解 (Function Annotations)

函数注解是在函数定义中添加的关于参数和返回值的元信息。它们不强制类型检查(除非你使用第三方库如 mypy),主要用于提高代码可读性、文档生成和第三方工具的类型提示。

# 定义一个带有函数注解的函数
def add(a: int, b: int) -> int:
    # 参数 a 后面跟 : int 表示 a 的类型提示是整数
    # 参数 b 后面跟 : int 表示 b 的类型提示是整数
    # 函数定义后面跟 -> int 表示函数的返回值类型提示是整数
    """This function adds two integers."""
    # 函数的文档字符串
    return a + b
    # 返回 a 和 b 的和

# 调用函数
result = add(5, 3)
# 调用函数,传入整数参数
print(result)
# 输出:8

# 函数注解可以通过 __annotations__ 属性访问
print(add.__annotations__)
# 输出:{'a': <class 'int'>, 'b': <class 'int'>, 'return': <class 'int'>}
# __annotations__ 是一个字典,存储了函数的所有注解信息

# 注意:注解只是提示,不强制类型
result_str = add("hello", "world")
# 尽管注解提示参数是 int,但 Python 运行时不会阻止你传入字符串
# 这里的操作会进行字符串拼接,而不是整数相加
print(result_str)
# 输出:helloworld
# 这再次强调了函数注解是类型提示,而不是强制类型检查

# 使用 mypy 进行静态类型检查(在命令行运行 mypy 你的文件.py)
# mypy 会发现 add("hello", "world") 不符合类型注解,并报告错误
# mypy 的输出会类似:
# my_module.py:xx: error: Argument 1 to "add" has incompatible type "str"; expected "int"  [arg-type]
# my_module.py:xx: error: Argument 2 to "add" has incompatible type "str"; expected "int"  [arg-type]
# Found 2 errors in 1 file (checked 1 source file)

函数注解本身对Python代码的执行没有直接影响,但它们极大地提高了代码的可读性和可维护性。通过注解,开发者可以清楚地了解函数期望的输入类型和返回类型。结合静态类型检查工具(如 mypy),函数注解可以在代码运行前捕获潜在的类型错误,提高代码的健壮性。在大型项目中,使用类型注解已经成为一种最佳实践。

1.5 偏函数 (Partial Functions)

偏函数允许你固定函数的一些参数,生成一个新的函数,这个新函数只需要提供剩余的参数。这在需要多次调用同一个函数,但其中一些参数值是固定的情况下非常有用。functools 模块提供了 partial 函数来实现这一功能。

import functools
# 导入 functools 模块

# 定义一个接受多个参数的函数
def multiply(a, b, c):
    # 这是一个接受三个参数的乘法函数
    return a * b * c

# 创建一个偏函数,固定第一个参数 a 为 2
multiply_by_two = functools.partial(multiply, 2)
# functools.partial(函数对象, 固定参数1, 固定参数2, ...)
# 创建了一个新的函数 multiply_by_two,它基于 multiply 函数,并固定了第一个位置参数为 2

# 调用偏函数,只需要提供剩余的参数
result1 = multiply_by_two(3, 4)
# 调用 multiply_by_two(3, 4) 实际上等同于调用 multiply(2, 3, 4)
print(result1)
# 输出:24

# 创建另一个偏函数,固定第一个参数 a 为 3,第二个参数 b 为 5
multiply_by_fifteen = functools.partial(multiply, 3, 5)
# 创建一个新的函数 multiply_by_fifteen,固定了第一个位置参数为 3,第二个位置参数为 5

# 调用偏函数,只需要提供剩余的参数 c
result2 = multiply_by_fifteen(10)
# 调用 multiply_by_fifteen(10) 实际上等同于调用 multiply(3, 5, 10)
print(result2)
# 输出:150

# 偏函数也可以固定关键字参数
def greet(name, greeting="Hello"):
    # 一个带有默认关键字参数的问候函数
    return f"{
              greeting}, {
              name}"

# 创建一个偏函数,固定 greeting 为 "Hi"
say_hi = functools.partial(greet, greeting="Hi")
# 固定了关键字参数 greeting 的值为 "Hi"

# 调用偏函数,只需要提供 name 参数
print(say_hi("Alice"))
# 输出:Hi, Alice
# 相当于调用 greet(name="Alice", greeting="Hi")

# 偏函数可以用于简化回调函数或配置函数
def send_email(recipient, subject, body, sender="noreply@example.com"):
    # 模拟发送邮件的函数
    print(f"Sending email from {
              sender} to {
              recipient} with subject '{
              subject}' and body '{
              body}'")
    # 打印邮件发送信息,模拟实际发送过程

# 创建一个偏函数,用于从特定发件人发送邮件
send_support_email = functools.partial(send_email, sender="support@example.com")
# 固定了发件人 sender 的值为 "support@example.com"

# 调用偏函数发送邮件
send_support_email("user@example.com", "Your support request", "We are looking into your issue.")
# 输出:Sending email from support@example.com to user@example.com with subject 'Your support request' and body 'We are looking into your issue.'
# 相当于调用 send_email(recipient="user@example.com", subject="Your support request", body="We are looking into your issue.", sender="support@example.com")

偏函数使得你可以基于现有函数创建更具特定用途的新函数,提高了代码的灵活性和复用性。它常用于回调函数、事件处理或需要配置函数的场景。


第二章:深入理解类与面向对象编程 (OOP)

Python是一种多范式编程语言,同时支持面向对象编程。深入理解Python的类和OOP机制对于编写复杂、可维护的代码至关重要。本章将探讨Python类的高级特性,包括继承、多态、抽象基类、特殊方法(魔术方法)、类变量与实例变量、属性(property)等。

2.1 类、对象与实例

类 (Class): 是创建对象的蓝图或模板,定义了对象的属性(数据)和方法(行为)。
对象 (Object) / 实例 (Instance): 是类的具体化,是根据类创建出来的实体。每个实例都有自己的状态(属性值)和行为(调用类定义的方法)。

# 定义一个简单的类
class Dog:
    # 这是一个类的定义

    # 类变量:所有 Dog 实例共享的属性
    species = "Canis familiaris"
    # 定义一个类变量 species,所有 Dog 对象都属于这个物种

    def __init__(self, name, age):
        # 构造函数或初始化方法,在使用 Dog() 创建实例时被调用
        # self 代表正在创建的实例对象本身
        # name 和 age 是传递给构造函数的参数
        self.name = name
        # 实例变量:每个 Dog 实例独有的属性
        # self.name 将传入的 name 参数赋值给当前实例的 name 属性
        self.age = age
        # self.age 将传入的 age 参数赋值给当前实例的 age 属性

    def bark(self):
        # 实例方法:定义对象的行为
        # 第一个参数通常是 self,代表调用该方法的实例对象
        print(f"{
              self.name} says Woof!")
        # 访问当前实例的 name 属性,并打印叫声

    def description(self):
        # 另一个实例方法
        return f"{
              self.name} is {
              self.age} years old."
        # 访问当前实例的 name 和 age 属性,并返回描述字符串

# 创建类的实例 (对象)
my_dog = Dog("Buddy", 5)
# 调用 Dog() 创建一个 Dog 类的实例,并传入 name="Buddy", age=5
# __init__ 方法被自动调用,my_dog 就是创建的实例对象

your_dog = Dog("Lucy", 3)
# 创建另一个 Dog 类的实例

# 访问实例的属性
print(my_dog.name)
# 输出:Buddy
print(my_dog.age)
# 输出:5

print(your_dog.name)
# 输出:Lucy
print(your_dog.age)
# 输出:3

# 访问类变量
print(Dog.species)
# 输出:Canis familiaris
print(my_dog.species)
# 输出:Canis familiaris
# 通过实例也可以访问类变量

# 调用实例的方法
my_dog.bark()
# 输出:Buddy says Woof!
# 调用 my_dog 实例的 bark 方法

print(my_dog.description())
# 输出:Buddy is 5 years old.
# 调用 my_dog 实例的 description 方法并打印返回值

your_dog.bark()
# 输出:Lucy says Woof!
# 调用 your_dog 实例的 bark 方法

这个基础示例回顾了类、实例、__init__ 构造函数、实例变量、类变量和实例方法的基本概念。self 参数是Python中实例方法的惯例,它指向调用方法的实例对象。

2.2 继承 (Inheritance)

继承允许一个类(子类或派生类)继承另一个类(父类、基类或超类)的属性和方法。这有助于实现代码的复用和构建层次结构。

# 定义一个父类
class Animal:
    # Animal 是父类

    def __init__(self, name):
        # 构造函数
        self.name = name
        # 实例变量 name

    def eat(self):
        # 父类方法 eat
        print(f"{
              self.name} is eating.")
        # 打印 eating 信息

    def sleep(self):
        # 父类方法 sleep
        print(f"{
              self.name} is sleeping.")
        # 打印 sleeping 信息

# 定义一个子类,继承自 Animal
class Dog(Animal):
    # Dog 是子类,括号中的 Animal 表示 Dog 继承 Animal 类

    def __init__(self, name, breed):
        # 子类的构造函数
        # 调用父类的构造函数来初始化父类的属性
        super().__init__(name)
        # 使用 super().方法名() 调用父类的方法
        # super() 返回一个代理对象,代表父类
        self.breed = breed
        # 子类自己的实例变量 breed

    def bark(self):
        # 子类可以有自己的方法
        print(f"{
              self.name} says Woof!")
        # 访问继承自父类的 name 属性

    # 子类可以重写父类的方法
    def eat(self):
        # 重写了父类的 eat 方法
        print(f"{
              self.name} is eating dog food.")
        # 打印不同的 eating 信息

# 创建子类实例
my_dog = Dog("Buddy", "Golden Retriever")
# 创建 Dog 类的实例

# 调用继承自父类的方法
my_dog.sleep()
# 输出:Buddy is sleeping.
# 调用继承的 sleep 方法

# 调用子类自己的方法
my_dog.bark()
# 输出:Buddy says Woof!

# 调用重写后的父类方法
my_dog.eat()
# 输出:Buddy is eating dog food.

# 访问父类和子类的属性
print(my_dog.name)
# 输出:Buddy
print(my_dog.breed)
# 输出:Golden Retriever

# 检查实例类型
print(isinstance(my_dog, Dog))
# 输出:True
# my_dog 是 Dog 的实例

print(isinstance(my_dog, Animal))
# 输出:True
# my_dog 也是 Animal 的实例,因为它继承自 Animal

# 检查类之间的继承关系
print(issubclass(Dog, Animal))
# 输出:True
# Dog 是 Animal 的子类

print(issubclass(Animal, Dog))
# 输出:False
# Animal 不是 Dog 的子类

继承是实现多态(Polymorphism)和代码复用的重要机制。super() 函数在子类中用于调用父类的方法,特别是在构造函数 __init__ 中,以确保父类的属性被正确初始化。

2.3 多态 (Polymorphism)

多态是指允许不同类的对象对同一消息(方法调用)作出不同的响应。在Python中,多态体现在不同类的对象可以调用同名的方法,并且根据对象的实际类型执行不同的代码。这通常是通过方法重写(Overriding)实现的。

# 定义一个通用的函数,接受 Animal 对象
def perform_eating(animal):
    # 这个函数接受一个 animal 对象
    print(f"--- Performing eating action for {
              animal.name} ---")
    # 打印分隔线和动物名称
    animal.eat()
    # 调用传入的 animal 对象的 eat 方法
    print("--- Eating action finished ---")
    # 打印分隔线

# 创建 Animal 类的实例
generic_animal = Animal("Generic Animal")
# 创建父类 Animal 的实例

# 创建 Dog 类的实例
my_dog = Dog("Buddy", "Golden Retriever")
# 创建子类 Dog 的实例

# 对不同类型的对象调用 perform_eating 函数
perform_eating(generic_animal)
# 输出:
# --- Performing eating action for Generic Animal ---
# Generic Animal is eating.
# --- Eating action finished ---
# 当传入 generic_animal 时,调用的是 Animal 类中的 eat 方法

perform_eating(my_dog)
# 输出:
# --- Performing eating action for Buddy ---
# Buddy is eating dog food.
# --- Eating action finished ---
# 当传入 my_dog 时,调用的是 Dog 类中重写后的 eat 方法
# 同一个 perform_eating 函数,对于不同类型的对象,调用同名的 eat() 方法产生了不同的行为,这就是多态

多态使得代码更加灵活和通用。你可以编写处理基类对象(如 Animal)的函数,而这个函数可以无缝地处理任何派生类(如 Dog)的对象,只要这些派生类实现了基类定义的(或者期望有的)方法。这通常被称为“鸭子类型”(Duck Typing):如果它走起来像鸭子,叫起来像鸭子,那么它就是一只鸭子——即只要对象拥有所需的方法,就可以像处理特定类型一样处理它,而无需关心它的确切类型。

2.4 抽象基类 (Abstract Base Classes – ABCs)

抽象基类用于定义接口,规定了子类必须实现的方法。它们不能被实例化。在Python中,可以使用 abc 模块来创建抽象基类。

import abc
# 导入 abc 模块

# 定义一个抽象基类
class Shape(abc.ABC):
    # Shape 继承自 abc.ABC,表明这是一个抽象基类

    @abc.abstractmethod
    # 使用 @abc.abstractmethod 装饰器标记抽象方法
    def area(self):
        # 抽象方法 area,没有具体的实现
        # 子类必须实现这个方法
        pass
        # pass 表示方法体为空,因为是抽象方法

    @abc.abstractmethod
    def perimeter(self):
        # 抽象方法 perimeter,子类必须实现
        pass

    def description(self):
        # 抽象基类也可以有具体实现的方法
        return "This is a geometric shape."

# 尝试实例化抽象基类会报错
# try:
#     s = Shape()
# except TypeError as e:
#     print(f"Error trying to instantiate abstract class: {e}")
# # 输出:Error trying to instantiate abstract class: Can't instantiate abstract class Shape with abstract methods area, perimeter

# 定义一个具体类 Rectangle,继承自 Shape
class Rectangle(Shape):
    # Rectangle 继承 Shape 抽象基类

    def __init__(self, width, height):
        # 构造函数
        self.width = width
        self.height = height

    # 子类必须实现抽象基类中的所有抽象方法
    def area(self):
        # 实现 area 抽象方法
        return self.width * self.height

    def perimeter(self):
        # 实现 perimeter 抽象方法
        return 2 * (self.width + self.height)

    # 子类也可以有自己的方法
    def rectangle_info(self):
        return f"Rectangle with width {
              self.width} and height {
              self.height}"

# 定义另一个具体类 Circle,继承自 Shape
import math # 导入 math 模块用于计算圆的面积和周长
class Circle(Shape):
    # Circle 继承 Shape 抽象基类

    def __init__(self, radius):
        # 构造函数
        self.radius = radius

    # 子类必须实现抽象基类中的所有抽象方法
    def area(self):
        # 实现 area 抽象方法
        return math.pi * self.radius**2

    def perimeter(self):
        # 实现 perimeter 抽象方法 (圆周长)
        return 2 * math.pi * self.radius

# 创建具体类的实例
rectangle = Rectangle(4, 5)
# 创建 Rectangle 实例
circle = Circle(10)
# 创建 Circle 实例

# 调用实现的抽象方法和基类的具体方法
print(f"Rectangle Area: {
              rectangle.area()}")
# 输出:Rectangle Area: 20
print(f"Rectangle Perimeter: {
              rectangle.perimeter()}")
# 输出:Rectangle Perimeter: 18
print(f"Rectangle Description: {
              rectangle.description()}")
# 输出:Rectangle Description: This is a geometric shape.

print(f"Circle Area: {
              circle.area():.2f}")
# 输出:Circle Area: 314.16 (保留两位小数)
print(f"Circle Perimeter: {
              circle.perimeter():.2f}")
# 输出:Circle Perimeter: 62.83 (保留两位小数)
print(f"Circle Description: {
              circle.description()}")
# 输出:Circle Description: This is a geometric shape.

# 定义一个函数,接受 Shape 类型的对象
def print_shape_info(shape: Shape):
    # 使用类型注解提示 shape 应该是 Shape 或其子类的实例
    # 这个函数可以处理任何继承自 Shape 的对象
    print(f"
Shape Info:")
    print(f"  Area: {
              shape.area():.2f}")
    # 调用 shape 对象的 area 方法
    print(f"  Perimeter: {
              shape.perimeter():.2f}")
    # 调用 shape 对象的 perimeter 方法
    print(f"  Description: {
              shape.description()}")
    # 调用 shape 对象的 description 方法

# 使用不同的 Shape 子类实例调用函数
print_shape_info(rectangle)
# 调用 print_shape_info,传入 Rectangle 实例

print_shape_info(circle)
# 调用 print_shape_info,传入 Circle 实例

# 如果尝试定义一个没有实现所有抽象方法的子类,也会报错
# class Triangle(Shape):
#     def __init__(self, base, height, side1, side2, side3):
#         self.base = base
#         self.height = height
#         self.side1 = side1
#         self.side2 = side2
#         self.side3 = side3
#
#     # Missing perimeter method implementation!
#     def area(self):
#         return 0.5 * self.base * self.height
#
# # try:
# #     t = Triangle(3, 4, 3, 4, 5)
# # except TypeError as e:
# #     print(f"Error trying to instantiate incomplete class: {e}")
# # # 输出:Error trying to instantiate incomplete class: Can't instantiate abstract class Triangle with abstract methods perimeter

抽象基类提供了一种定义接口的正式方式,强制子类实现特定的方法。这在设计大型系统或库时非常有用,可以确保遵循特定的契约。它也结合了多态性,允许你编写处理抽象基类对象的通用代码,而这些代码可以操作任何实现了该接口的具体类实例。

2.5 特殊方法 (Special Methods / Magic Methods / Dunder Methods)

特殊方法是Python中以双下划线开头和结尾的方法(例如 __init__, __str__, __len__ 等)。它们不是直接调用,而是在特定的情况下由Python解释器自动调用。通过实现这些特殊方法,可以使自定义类的对象支持各种Python内置操作,例如:

对象的创建和销毁 (__new__, __init__, __del__)
对象的字符串表示 (__str__, __repr__)
对象的大小/长度 (__len__)
对象的属性访问 (__getattr__, __setattr__, __delattr__, __getattribute__)
对象的算术运算 (__add__, __sub__, __mul__, etc.)
对象的比较运算 (__eq__, __ne__, __lt__, etc.)
对象的迭代 (__iter__, __next__)
对象的上下文管理 (__enter__, __exit__)
对象的调用 (__call__)

掌握特殊方法是深入理解Python对象模型和实现Pythonic代码的关键。

企业级代码示例:实现一个自定义容器类

# 实现一个表示二维向量的类,支持向量加法、长度计算、字符串表示等
import math

class Vector:
    # 定义一个表示二维向量的类

    def __init__(self, x, y):
        # 构造函数,初始化向量的 x 和 y 分量
        self.x = x
        self.y = y

    def __repr__(self):
        # __repr__ 特殊方法:返回对象的“官方”字符串表示
        # 目标是 unambiguous(无歧义),通常用于调试和开发
        return f"Vector({
              self.x}, {
              self.y})"
        # 返回一个能重建该对象的字符串表示

    def __str__(self):
        # __str__ 特殊方法:返回对象的“非正式”或“用户友好”字符串表示
        # 当使用 print() 或 str() 函数时调用
        return f"({
              self.x}, {
              self.y})"
        # 返回一个用户更容易理解的字符串表示

    def __add__(self, other):
        # __add__ 特殊方法:实现向量加法 (self + other)
        # other 是另一个 Vector 对象
        if not isinstance(other, Vector):
            # 检查 other 是否是 Vector 类的实例
            raise TypeError("Operand must be a Vector instance")
            # 如果不是,抛出 TypeError

        # 返回一个新的 Vector 对象,其分量是对应分量之和
        return Vector(self.x + other.x, self.y + other.y)

    def __mul__(self, scalar):
        # __mul__ 特殊方法:实现向量与标量的乘法 (self * scalar)
        # scalar 是一个数字
        if not isinstance(scalar, (int, float)):
            # 检查 scalar 是否是整数或浮点数
            raise TypeError("Operand must be a number")

        # 返回一个新的 Vector 对象,其分量是原分量与标量相乘
        return Vector(self.x * scalar, self.y * scalar)

    def __rmul__(self, scalar):
        # __rmul__ 特殊方法:实现标量与向量的乘法 (scalar * self)
        # 当 Python 解释器执行 scalar * self 时,如果 scalar 不知道如何与 Vector 相乘,
        # 会尝试调用 self 的 __rmul__ 方法
        # 向量乘法满足交换律,所以 __rmul__ 的实现与 __mul__ 相同
        return self * scalar
        # 直接调用 __mul__ 方法

    def __len__(self):
        # __len__ 特殊方法:定义对象的长度
        # 当使用 len() 函数时调用
        # 对于向量,我们可以定义其“长度”为其维度,这里是 2
        return 2

    def __getitem__(self, index):
        # __getitem__ 特殊方法:实现按索引访问 (vector[index])
        # 允许像列表或元组一样通过索引访问向量分量
        if index == 0:
            return self.x
        elif index == 1:
            return self.y
        else:
            raise IndexError("Vector index out of range")
            # 如果索引无效,抛出 IndexError

    def __eq__(self, other):
        # __eq__ 特殊方法:实现相等比较 (self == other)
        if not isinstance(other, Vector):
            return False
            # 如果不是 Vector 实例,则不相等
        return self.x == other.x and self.y == other.y
        # 只有当 x 和 y 分量都相等时,两个向量才相等

    def __abs__(self):
        # __abs__ 特殊方法:实现向量的模长 (abs(self))
        # 使用 math.sqrt 计算欧几里得距离
        return math.sqrt(self.x**2 + self.y**2)

# 创建 Vector 实例
v1 = Vector(2, 3)
# 创建向量 v1 (2, 3)
v2 = Vector(5, 1)
# 创建向量 v2 (5, 1)

# 使用特殊方法 __repr__ 和 __str__
print(repr(v1))
# 输出:Vector(2, 3)
# 调用 repr() 函数会调用 __repr__ 方法
print(str(v1))
# 输出:(2, 3)
# 调用 str() 函数或 print() 函数会调用 __str__ 方法

# 使用特殊方法 __add__
v3 = v1 + v2
# 相当于 v3 = v1.__add__(v2)
print(v3)
# 输出:(7, 4)
# v3 是一个新的 Vector 对象,其 x=2+5=7, y=3+1=4

# 使用特殊方法 __mul__ 和 __rmul__
v4 = v1 * 2
# 相当于 v4 = v1.__mul__(2)
print(v4)
# 输出:(4, 6)
# v4 是一个新的 Vector 对象,其 x=2*2=4, y=3*2=6

v5 = 3 * v1
# 相当于 v5 = v1.__rmul__(3)
print(v5)
# 输出:(6, 9)
# v5 是一个新的 Vector 对象,其 x=2*3=6, y=3*3=9

# 使用特殊方法 __len__
print(len(v1))
# 输出:2
# 调用 len() 函数会调用 __len__ 方法

# 使用特殊方法 __getitem__
print(v1[0])
# 输出:2
# 访问向量的 x 分量,相当于调用 v1.__getitem__(0)
print(v1[1])
# 输出:3
# 访问向量的 y 分量,相当于调用 v1.__getitem__(1)

# try:
#     print(v1[2])
# except IndexError as e:
#     print(f"Caught expected error: {e}")
# # 输出:Caught expected error: Vector index out of range
# 尝试访问无效索引会触发 __getitem__ 中的 IndexError

# 使用特殊方法 __eq__
print(v1 == Vector(2, 3))
# 输出:True
# 调用 v1.__eq__(Vector(2, 3))
print(v1 == Vector(1, 1))
# 输出:False
# 调用 v1.__eq__(Vector(1, 1))
print(v1 == "not a vector")
# 输出:False
# 调用 v1.__eq__("not a vector")

# 使用特殊方法 __abs__
print(abs(v1))
# 输出:3.605551275463989
# 调用 abs(v1) 会调用 v1.__abs__(),计算向量 (2, 3) 的模长 sqrt(2^2 + 3^2) = sqrt(13)

通过实现特殊方法,我们可以让自定义类的对象行为与内置类型类似,使得代码更加直观和易于使用。这是Python面向对象编程中非常强大的一部分。理解这些特殊方法及其触发时机是编写高质量Python代码的基础。

2.6 类变量与实例变量的深入

前面我们简单介绍了类变量和实例变量的区别。这里我们将更深入地探讨它们的存储方式和潜在的陷阱。

类变量 (Class Variables): 定义在类内部、方法之外的变量。它们属于类本身,所有类的实例共享同一个类变量的副本。通常用于存储与类相关联的属性,如常量、统计数据或类级别的配置。
实例变量 (Instance Variables): 定义在方法内部,通常在 __init__ 方法中,以 self.variable_name 的形式创建。它们属于类的特定实例,每个实例都有自己独立的实例变量副本。用于存储每个对象独有的状态。

class MyClass:
    # 定义一个类 MyClass

    class_variable = "I am a class variable"
    # 定义一个类变量 class_variable

    def __init__(self, instance_value):
        # 构造函数
        self.instance_variable = instance_value
        # 定义一个实例变量 instance_variable

# 创建类的实例
obj1 = MyClass("instance 1 value")
# 创建 obj1 实例
obj2 = MyClass("instance 2 value")
# 创建 obj2 实例

# 访问类变量
print(MyClass.class_variable)
# 输出:I am a class variable
# 直接通过类名访问类变量

print(obj1.class_variable)
# 输出:I am a class variable
# 通过实例访问类变量,如果实例没有同名属性,会查找类属性

print(obj2.class_variable)
# 输出:I am a class variable
# 通过另一个实例访问类变量,也是相同的

# 访问实例变量
print(obj1.instance_variable)
# 输出:instance 1 value
# 访问 obj1 的实例变量

print(obj2.instance_variable)
# 输出:instance 2 value
# 访问 obj2 的实例变量,这是独立于 obj1 的值

# 修改类变量
MyClass.class_variable = "Class variable modified"
# 通过类名修改类变量

print(MyClass.class_variable)
# 输出:Class variable modified
print(obj1.class_variable)
# 输出:Class variable modified
print(obj2.class_variable)
# 输出:Class variable modified
# 所有实例访问的类变量都变了

# 修改实例变量
obj1.instance_variable = "obj1 instance variable modified"
# 修改 obj1 的实例变量

print(obj1.instance_variable)
# 输出:obj1 instance variable modified
print(obj2.instance_variable)
# 输出:instance 2 value
# 修改 obj1 的实例变量不会影响 obj2 的实例变量

# 潜在陷阱:通过实例“修改”类变量
obj1.class_variable = "Modified through obj1 instance"
# 看起来像是在修改类变量,但实际上是在 obj1 实例中创建了一个同名的实例变量

print(MyClass.class_variable)
# 输出:Class variable modified
# 类变量并没有被修改

print(obj1.class_variable)
# 输出:Modified through obj1 instance
# obj1 现在访问的是它自己的实例变量 class_variable

print(obj2.class_variable)
# 输出:Class variable modified
# obj2 仍然访问的是原始的类变量

# 检查实例的字典属性
print(obj1.__dict__)
# 输出:{'instance_variable': 'obj1 instance variable modified', 'class_variable': 'Modified through obj1 instance'}
# obj1 的 __dict__ 包含了它的实例变量,包括新创建的 class_variable

print(obj2.__dict__)
# 输出:{'instance_variable': 'instance 2 value'}
# obj2 的 __dict__ 只包含它自己的实例变量 instance_variable

print(MyClass.__dict__)
# 输出:<mappingproxy object at 0x...>
# MyClass 的 __dict__ 包含了类变量和方法等,其中 class_variable 仍然是 'Class variable modified'

通过实例访问类变量时,如果实例本身没有该名称的属性,Python会在类的MRO(Method Resolution Order,方法解析顺序)中查找。如果通过实例给一个类变量赋值,Python会在该实例的 __dict__ 中创建一个同名的实例变量,这会“遮蔽”同名的类变量。后续通过该实例访问这个名称时,会优先访问实例变量。通过类名修改类变量会影响所有实例,而通过实例“修改”类变量只会影响该实例。理解这一点对于避免意外行为非常重要。

2.7 属性 (Properties)

在Python中,可以通过 @property 装饰器将类的方法变成可以像属性一样访问的特殊方法。这允许你在访问属性时执行一些逻辑,例如计算、验证或延迟加载数据。

class Circle:
    # 定义一个表示圆的类

    def __init__(self, radius):
        # 构造函数,初始化半径
        self._radius = radius
        # 使用 _radius 是一个约定,表示这是一个“受保护的”内部属性,不应该直接从外部访问

    @property
    # 使用 @property 装饰器,将 radius 方法变成一个可读属性
    def radius(self):
        # 这个方法在访问 obj.radius 时被调用
        print("Getting radius...")
        # 打印访问提示
        return self._radius
        # 返回内部存储的半径值

    @radius.setter
    # 使用 @属性名.setter 装饰器,定义 radius 属性的设置方法
    def radius(self, value):
        # 这个方法在给 obj.radius 赋值时被调用 (obj.radius = value)
        print(f"Setting radius to {
              value}...")
        # 打印设置提示
        if value < 0:
            # 验证输入值
            raise ValueError("Radius cannot be negative")
            # 如果半径小于 0,抛出 ValueError
        self._radius = value
        # 将验证后的值赋给内部存储的半径属性

    @radius.deleter
    # 使用 @属性名.deleter 装饰器,定义 radius 属性的删除方法
    def radius(self):
        # 这个方法在删除 obj.radius 时被调用 (del obj.radius)
        print("Deleting radius...")
        # 打印删除提示
        del self._radius
        # 删除内部存储的半径属性

    @property
    # 定义一个计算周长的属性 (只读)
    def circumference(self):
        # 这个方法在访问 obj.circumference 时被调用
        print("Calculating circumference...")
        return 2 * math.pi * self._radius
        # 计算并返回周长

    # 注意:circumference 属性没有 @circumference.setter 或 @circumference.deleter,所以它是只读的

# 创建 Circle 实例
c = Circle(5)
# 创建半径为 5 的圆实例

# 访问 property 属性 (调用 @radius.getter)
print(f"Initial radius: {
              c.radius}")
# 输出:
# Getting radius...
# Initial radius: 5

# 设置 property 属性 (调用 @radius.setter)
c.radius = 10
# 输出:Setting radius to 10...
print(f"New radius: {
              c.radius}")
# 输出:
# Getting radius...
# New radius: 10

# 尝试设置无效值
try:
    c.radius = -2
except ValueError as e:
    print(f"Caught expected error: {
              e}")
# 输出:
# Setting radius to -2...
# Caught expected error: Radius cannot be negative

# 访问计算属性 (调用 @circumference.getter)
print(f"Circumference: {
              c.circumference:.2f}")
# 输出:
# Calculating circumference...
# Circumference: 62.83

# 尝试设置只读属性会报错
# try:
#     c.circumference = 100
# except AttributeError as e:
#     print(f"Caught expected error: {e}")
# # 输出:Caught expected error: can't set attribute

# 删除 property 属性 (调用 @radius.deleter)
del c.radius
# 输出:Deleting radius...

# 尝试访问已删除的属性会报错
# try:
#     print(c.radius)
# except AttributeError as e:
#     print(f"Caught expected error: {e}")
# # 输出:Caught expected error: 'Circle' object has no attribute '_radius'
# 删除 radius 属性实际上删除了内部的 _radius 属性

@property 装饰器使得我们可以用访问属性的简单语法来实现更复杂的存取逻辑。它提高了代码的可读性和封装性,将内部实现细节隐藏起来,只暴露属性接口。通过 @属性名.setter@属性名.deleter,可以分别定义属性的设置和删除行为。

2.8 类方法、静态方法与实例方法的区别

Python中的方法有三种类型:实例方法、类方法和静态方法,它们通过第一个参数和装饰器来区分。

实例方法 (Instance Methods): 最常见的方法,第一个参数是 self,指向调用该方法的实例对象。可以通过 self 访问和修改实例的属性。
类方法 (Class Methods): 使用 @classmethod 装饰器标记。第一个参数是 cls(惯例),指向调用该方法的类本身。可以通过 cls 访问和修改类变量,或者创建类的实例(例如用于替代构造函数的“工厂方法”)。
静态方法 (Static Methods): 使用 @staticmethod 装饰器标记。没有特殊的第一个参数(既没有 self 也没有 cls)。它们不访问类或实例的任何属性,就像普通的函数一样,只是为了组织代码方便而放在类内部。

class MyClass:
    # 定义一个类 MyClass

    class_variable = 0
    # 定义一个类变量

    def __init__(self, instance_value):
        # 构造函数
        self.instance_variable = instance_value
        # 定义一个实例变量

    # 实例方法
    def instance_method(self):
        # 第一个参数是 self,指向实例
        print(f"Calling instance_method on {
              self!r}")
        # {self!r} 会调用 self 的 __repr__ 方法,打印实例的官方表示
        print(f"Accessing instance_variable: {
              self.instance_variable}")
        # 可以访问实例变量
        print(f"Accessing class_variable from instance: {
              self.class_variable}")
        # 也可以通过实例访问类变量 (如果实例没有同名属性)

    # 类方法
    @classmethod
    def class_method(cls):
        # 使用 @classmethod 装饰器
        # 第一个参数是 cls,指向类本身
        print(f"Calling class_method on {
              cls.__name__}")
        # 可以访问类名 cls.__name__
        print(f"Accessing class_variable from class method: {
              cls.class_variable}")
        # 可以访问类变量 cls.class_variable
        # cls.class_variable += 1 # 可以修改类变量

    # 静态方法
    @staticmethod
    def static_method(arg1, arg2):
        # 使用 @staticmethod 装饰器
        # 没有特殊的第一个参数 (self 或 cls)
        print(f"Calling static_method with args: {
              arg1}, {
              arg2}")
        # 无法访问实例变量或类变量 (除非通过显式传入实例或类)
        # print(self.instance_variable) # 会报错
        # print(cls.class_variable) # 会报错

# 创建类的实例
obj = MyClass("instance value")

# 调用实例方法
obj.instance_method()
# 输出:
# Calling instance_method on <__main__.MyClass object at 0x...>
# Accessing instance_variable: instance value
# Accessing class_variable from instance: 0

# 调用类方法
MyClass.class_method()
# 输出:
# Calling class_method on MyClass
# Accessing class_variable from class method: 0
# 通过类名调用类方法

obj.class_method()
# 输出:
# Calling class_method on MyClass
# Accessing class_variable from class method: 0
# 也可以通过实例调用类方法,此时 cls 仍然指向 MyClass 类

# 调用静态方法
MyClass.static_method(10, 20)
# 输出:Calling static_method with args: 10, 20
# 通过类名调用静态方法

obj.static_method(30, 40)
# 输出:Calling static_method with args: 30, 40
# 也可以通过实例调用静态方法,但静态方法内部无法访问实例或类属性

# 类方法常用于替代构造函数创建实例 (工厂方法)
class ConfigParser:
    # 一个简单的配置解析器类
    def __init__(self, config_dict):
        self.config = config_dict

    def get_value(self, key):
        return self.config.get(key)

    @classmethod
    def from_file(cls, filepath):
        # 类方法 from_file,作为工厂方法从文件加载配置
        # cls 代表 ConfigParser 类本身
        print(f"Loading config from {
              filepath} using class method...")
        config_dict = {
            }
        try:
            with open(filepath, 'r') as f:
                for line in f:
                    line = line.strip()
                    if not line or line.startswith('#'):
                        continue # Skip empty lines and comments
                    key, value = line.split('=', 1)
                    config_dict[key.strip()] = value.strip()
        except FileNotFoundError:
            print(f"Config file not found: {
              filepath}")
            return None # Return None if file not found
        # 使用 cls() 创建类的实例,而不是 ConfigParser()
        return cls(config_dict)

# 模拟创建一个配置文件
with open("config.txt", "w") as f:
    f.write("database_url=mysql://user:pass@host/db
")
    f.write("timeout=30
")
    f.write("# This is a comment
")
    f.write("log_level=INFO
")

# 使用类方法 from_file 创建 ConfigParser 实例
config = ConfigParser.from_file("config.txt")

if config:
    # 如果成功加载配置
    print(f"Database URL: {
              config.get_value('database_url')}")
    # 输出:Database URL: mysql://user:pass@host/db
    print(f"Timeout: {
              config.get_value('timeout')}")
    # 输出:Timeout: 30
    print(f"Log Level: {
              config.get_value('log_level')}")
    # 输出:Log Level: INFO

# 清理模拟文件
import os
os.remove("config.txt")

理解这三种方法类型的区别以及何时使用它们,对于设计清晰、灵活的类结构非常重要。实例方法处理实例特有的数据,类方法处理类特有的数据或用于创建实例,静态方法则只是类内部的普通函数。

2.9 多重继承与方法解析顺序 (MRO)

Python支持多重继承,即一个类可以继承自多个父类。这带来了更大的灵活性,但也可能引入复杂性,特别是当不同父类有同名方法时。Python使用C3线性化算法来确定方法解析顺序(MRO),即在查找方法或属性时搜索基类的顺序。

# 定义几个基类
class A:
    def greet(self):
        print("Hello from A")

class B:
    def greet(self):
        print("Hello from B")

class C(A):
    def greet(self):
        print("Hello from C")

class D(B, C):
    # D 继承自 B 和 C
    # B 在前,C 在后
    pass

# 创建 D 的实例
d = D()

# 调用 greet 方法
d.greet()
# 输出:Hello from B
# Python 按照 MRO 查找 greet 方法,首先在 D 中查找,没有找到
# 然后按照 MRO 顺序在基类中查找:B -> C -> A -> object
# 在 B 中找到了 greet 方法,所以调用的是 B 的 greet

# 查看类的 MRO
print(D.mro())
# 输出:[<class '__main__.D'>, <class '__main__.B'>, <class '__main__.C'>, <class '__main__.A'>, <class 'object'>]
# 或者使用类属性 __mro__
print(D.__mro__)
# 输出:(<class '__main__.D'>, <class '__main__.B'>, <class '__main__.C'>, <class '__main__.A'>, <class 'object'>)
# MRO 清晰地显示了 Python 查找方法时的顺序:D -> B -> C -> A -> object

# 如果 D 自己实现了 greet 方法
class E(B, C):
    def greet(self):
        print("Hello from E")

e = E()
e.greet()
# 输出:Hello from E
# 如果子类本身有同名方法,会优先调用子类的方法

# 复杂的多重继承示例
class Base1:
    def method(self):
        print("Method from Base1")

class Base2:
    def method(self):
        print("Method from Base2")

class Derived1(Base1):
    pass

class Derived2(Base2):
    def method(self):
        print("Method from Derived2")

class FinalDerived(Derived1, Derived2):
    pass

# 查看 FinalDerived 的 MRO
print(FinalDerived.mro())
# 输出:[<class '__main__.FinalDerived'>, <class '__main__.Derived1'>, <class '__main__.Derived2'>, <class '__main__.Base1'>, <class '__main__.Base2'>, <class 'object'>]

# 创建 FinalDerived 实例并调用 method
f = FinalDerived()
f.method()
# 输出:Method from Derived2
# 根据 MRO (FinalDerived -> Derived1 -> Derived2 -> Base1 -> Base2),
# 在 Derived2 中找到了 method 方法,所以调用的是 Derived2 的方法。

MRO 是理解多重继承的关键。C3线性化算法确保了基类的单调性(子类的MRO是基类MRO的超集)和局部优先顺序(同一父类中的方法优先级高于更远的父类)。在处理复杂的多重继承结构时,检查 ClassName.mro()ClassName.__mro__ 是非常有帮助的。尽量避免使用多重继承来处理不相关的类,更推荐使用组合(Composition)来构建复杂对象。当使用多重继承时,通常用于混入(Mixin)类,这些类提供特定的功能,但不代表对象的“is-a”关系。

2.10 组合 (Composition)

相比于继承(“is-a”关系),组合(“has-a”关系)是另一种实现代码复用的方式。通过将一个类的实例作为另一个类的属性,可以构建更灵活、更松耦合的系统。

# 定义一个负责发送通知的类
class Notifier:
    # 通知器类

    def send_email(self, recipient, message):
        # 发送邮件的方法
        print(f"Sending email to {
              recipient}: {
              message}")

    def send_sms(self, phone_number, message):
        # 发送短信的方法
        print(f"Sending SMS to {
              phone_number}: {
              message}")

# 定义一个用户类,包含一个 Notifier 实例
class User:
    # 用户类

    def __init__(self, name, email, phone):
        # 构造函数
        self.name = name
        self.email = email
        self.phone = phone
        # 用户拥有一个 Notifier 实例
        self.notifier = Notifier()
        # 在 User 类的实例中创建 Notifier 的实例
        # 这就是组合,User “has a” Notifier

    def notify_user_email(self, message):
        # 用户通过其拥有的 Notifier 发送邮件通知自己
        print(f"User {
              self.name} is sending email notification...")
        self.notifier.send_email(self.email, message)
        # 调用 notifier 实例的 send_email 方法

    def notify_user_sms(self, message):
        # 用户通过其拥有的 Notifier 发送短信通知自己
        print(f"User {
              self.name} is sending SMS notification...")
        self.notifier.send_sms(self.phone, message)
        # 调用 notifier 实例的 send_sms 方法

# 创建 User 实例
user = User("Alice", "alice@example.com", "123-456-7890")
# 创建 Alice 用户实例

# 调用 User 的方法,这些方法内部使用了 Notifier 实例
user.notify_user_email("Welcome to our service!")
# 输出:
# User Alice is sending email notification...
# Sending email to alice@example.com: Welcome to our service!

user.notify_user_sms("Your account is updated.")
# 输出:
# User Alice is sending SMS notification...
# Sending SMS to 123-456-7890: Your account is updated.

# 如果 Notifier 的实现需要改变 (例如,换一个邮件发送库),
# 只需要修改 Notifier 类内部,而 User 类只需要依赖于 Notifier 的接口 (即有哪些方法),
# 这降低了耦合度。

组合通过将一个类的实例作为另一个类的属性来实现。这使得一个对象可以利用另一个对象的功能。组合的优点包括更低的耦合度(User 类只依赖于 Notifier 的行为,而不是它的实现细节),更高的灵活性(可以轻松地替换 Notifier 的实现),以及避免了多重继承可能带来的复杂性。在设计类结构时,应优先考虑使用组合而不是继承,除非存在明确的“is-a”关系。

2.11 企业级案例:使用类和特殊方法构建数据处理管道

在一个企业级数据处理应用中,我们可能需要构建一个管道来处理数据流。这个管道由一系列处理步骤组成,每个步骤接收输入数据并产生输出数据。我们可以使用类来表示每个处理步骤,并利用特殊方法来构建管道结构。

# 定义一个抽象基类,表示管道中的一个处理步骤
import abc

class DataProcessor(abc.ABC):
    # 抽象基类 DataProcessor

    @abc.abstractmethod
    def process(self, data):
        # 抽象方法 process,所有具体处理器必须实现
        # 接收输入数据 data,返回处理后的数据
        pass

    def __call__(self, data):
        # 实现 __call__ 特殊方法
        # 使得 DataProcessor 的实例可以像函数一样被调用 (processor(data))
        print(f"Processing data using {
              self.__class__.__name__}...")
        # 打印当前处理器的类名
        processed_data = self.process(data)
        # 调用具体的 process 方法处理数据
        print(f"{
              self.__class__.__name__} finished processing.")
        return processed_data
        # 返回处理后的数据

    def __or__(self, other):
        # 实现 __or__ 特殊方法,允许使用 | 操作符连接处理器
        # self 是当前的 DataProcessor 实例
        # other 是另一个 DataProcessor 实例 (或其子类实例)
        # 这使得我们可以写 processor1 | processor2 来构建管道
        if not isinstance(other, DataProcessor):
            # 检查 other 是否是 DataProcessor 的实例
            raise TypeError("Can only chain with another DataProcessor")
            # 如果不是,抛出 TypeError

        # 返回一个新的 Pipeline 对象,表示将 self 和 other 连接起来
        return Pipeline(self, other)

# 定义一些具体的处理器类
class DataCleaner(DataProcessor):
    # 数据清洗处理器,继承自 DataProcessor
    def process(self, data):
        # 实现 process 方法
        print("Cleaning data...")
        # 模拟数据清洗:移除空白和转换为小写
        return data.strip().lower()

class DataTransformer(DataProcessor):
    # 数据转换处理器,继承自 DataProcessor
    def process(self, data):
        # 实现 process 方法
        print("Transforming data...")
        # 模拟数据转换:在数据前后加上标记
        return f"[TRANSFORMED]{
              data}[/TRANSFORMED]"

class DataValidator(DataProcessor):
    # 数据验证处理器,继承自 DataProcessor
    def process(self, data):
        # 实现 process 方法
        print("Validating data...")
        # 模拟数据验证:检查数据是否包含 "transformed"
        if "transformed" in data:
            print("Validation successful!")
            return data
        else:
            raise ValueError("Validation failed: data not transformed")

class Pipeline:
    # 定义一个表示管道的类
    def __init__(self, *processors):
        # 构造函数,接收一个或多个 DataProcessor 实例作为管道步骤
        self.processors = processors
        # 将传入的处理器存储在列表中

    def process(self, initial_data):
        # 管道的 process 方法,按顺序执行所有处理器
        current_data = initial_data
        # 初始化当前数据为输入数据
        for processor in self.processors:
            # 遍历管道中的每一个处理器
            current_data = processor(current_data)
            # 调用当前处理器的 __call__ 方法处理数据,并更新当前数据
            # processor(current_data) 等同于 processor.__call__(current_data)
        return current_data
        # 返回经过所有处理器处理后的最终数据

    def __or__(self, other):
        # 实现 Pipeline 类自身的 __or__ 特殊方法
        # 允许将 Pipeline 对象与 DataProcessor 或另一个 Pipeline 连接
        if isinstance(other, DataProcessor):
            # 如果 other 是一个 DataProcessor
            # 返回一个新的 Pipeline,包含当前 Pipeline 的所有处理器和 other
            return Pipeline(*self.processors, other)
        elif isinstance(other, Pipeline):
            # 如果 other 是另一个 Pipeline
            # 返回一个新的 Pipeline,包含当前 Pipeline 和 other 的所有处理器
            return Pipeline(*self.processors, *other.processors)
        else:
            raise TypeError("Can only chain with a DataProcessor or Pipeline")

# 创建处理器实例
cleaner = DataCleaner()
transformer = DataTransformer()
validator = DataValidator()

# 使用 | 操作符构建数据处理管道
# pipeline = cleaner | transformer | validator
# 这通过链式调用 __or__ 特殊方法来构建 Pipeline 对象
# cleaner.__or__(transformer) 返回一个 Pipeline(cleaner, transformer) 对象
# 然后这个 Pipeline 对象调用其自身的 __or__(validator) 方法,返回 Pipeline(cleaner, transformer, validator)

# 或者手动创建 Pipeline 对象
pipeline = Pipeline(cleaner, transformer, validator)


# 定义输入数据
input_data = "  Some RAW DaTA with trailing spaces  "

# 执行管道处理
try:
    final_data = pipeline.process(input_data)
    # 调用 pipeline 对象的 process 方法处理输入数据
    print(f"
Final processed data: {
              final_data}")
    # 输出最终处理结果
    # 输出:
    # Processing data using DataCleaner...
    # Cleaning data...
    # DataCleaner finished processing.
    # Processing data using DataTransformer...
    # Transforming data...
    # DataTransformer finished processing.
    # Processing data using DataValidator...
    # Validating data...
    # Validation successful!
    # DataValidator finished processing.
    #
    # Final processed data: [TRANSFORMED]some raw data with trailing spaces[/TRANSFORMED]

    # 示例:处理会引起验证失败的数据
    print("
--- Processing data that will fail validation ---")
    invalid_input_data = "  Data without transformation  "
    invalid_pipeline = Pipeline(cleaner, validator) # Skipping transformer
    # 构建一个不包含 transformer 的管道,验证会失败

    invalid_pipeline.process(invalid_input_data)
    # 尝试处理数据,会抛出 ValueError
except ValueError as e:
    print(f"Caught expected error during processing: {
              e}")
    # 输出:
    # Processing data using DataCleaner...
    # Cleaning data...
    # DataCleaner finished processing.
    # Processing data using DataValidator...
    # Validating data...
    # Caught expected error during processing: Validation failed: data not transformed

这个企业级案例展示了如何结合抽象基类、特殊方法 (__call__, __or__) 和组合来构建一个灵活且可扩展的数据处理管道。每个处理步骤是一个实现了 DataProcessor 接口的类。通过实现 __call__,使得处理器的实例可以像函数一样被调用,简化了管道的执行逻辑。通过实现 __or__,使得可以使用简洁的 | 操作符来链式地连接处理器,提高了管道定义的表达力。这是一个典型的使用面向对象和Python高级特性解决实际问题的例子。

第三章:迭代器与生成器:高效处理序列数据

在处理大量数据时,如何高效地访问和处理序列中的元素是一个关键问题。Python提供了迭代器(Iterators)和生成器(Generators)这两种强大的机制来解决这个问题,它们允许你按需生成序列中的元素,而不是一次性加载整个序列到内存中,从而节省内存并提高性能。

3.1 迭代器协议 (Iterator Protocol)

迭代器是实现了迭代器协议的对象。迭代器协议包含两个核心方法:

__iter__(): 返回迭代器对象本身。这使得迭代器对象本身也是一个可迭代对象。
__next__(): 返回序列中的下一个元素。如果序列中没有更多元素,应该抛出 StopIteration 异常。

任何实现了这两个方法的对象都可以作为迭代器使用。

# 定义一个简单的计数器迭代器
class CounterIterator:
    # 这是一个自定义的计数器迭代器类

    def __init__(self, max_count):
        # 构造函数,初始化计数器和最大值
        self.max_count = max_count
        # 设置计数器的最大值
        self.current_count = 0
        # 初始化当前计数为 0

    def __iter__(self):
        # 实现迭代器协议的 __iter__ 方法
        # 返回迭代器对象本身
        print("Inside __iter__")
        return self

    def __next__(self):
        # 实现迭代器协议的 __next__ 方法
        print(f"Inside __next__, current_count: {
              self.current_count}")
        # 打印当前计数器的值

        if self.current_count < self.max_count:
            # 检查当前计数是否小于最大值
            value = self.current_count
            # 获取当前计数的值
            self.current_count += 1
            # 将计数器加一
            return value
            # 返回当前计数的值

        else:
            # 如果当前计数达到或超过最大值
            print("Inside __next__, raising StopIteration")
            # 打印提示信息
            raise StopIteration
            # 抛出 StopIteration 异常,表示迭代结束

# 使用自定义迭代器
my_iterator = CounterIterator(5)
# 创建一个 CounterIterator 实例,最大计数为 5

# 1. 直接使用 __next__ 方法(通常不这样做,但为了演示原理)
# print(my_iterator.__next__()) # 输出:Inside __next__, current_count: 0 
 0
# print(my_iterator.__next__()) # 输出:Inside __next__, current_count: 1 
 1
# print(my_iterator.__next__()) # 输出:Inside __next__, current_count: 2 
 2
# print(my_iterator.__next__()) # 输出:Inside __next__, current_count: 3 
 3
# print(my_iterator.__next__()) # 输出:Inside __next__, current_count: 4 
 4
# print(my_iterator.__next__()) # 输出:Inside __next__, current_count: 5 
 Inside __next__, raising StopIteration 
 Traceback (most recent call last): ... StopIteration

# 2. 使用内置的 next() 函数(更常见)
# next() 函数会自动调用迭代器的 __next__() 方法
print(next(my_iterator))
# 输出:Inside __next__, current_count: 0 
 0
print(next(my_iterator))
# 输出:Inside __next__, current_count: 1 
 1
print(next(my_iterator))
# 输出:Inside __next__, current_count: 2 
 2
print(next(my_iterator))
# 输出:Inside __next__, current_count: 3 
 3
print(next(my_iterator))
# 输出:Inside __next__, current_count: 4 
 4

# 当没有更多元素时,next() 会捕获 StopIteration 并停止
# try:
#     next(my_iterator)
# except StopIteration:
#     print("Iteration finished.")
# # 输出:Inside __next__, current_count: 5 
 Inside __next__, raising StopIteration 
 Iteration finished.

# 3. 使用 for 循环(最常见)
# for 循环会自动调用可迭代对象的 __iter__() 方法获取迭代器,
# 然后不断调用迭代器的 __next__() 方法,直到捕获 StopIteration 异常
print("
Using for loop:")
# 再次创建一个新的迭代器实例,因为上面的已经耗尽
my_iterator_for = CounterIterator(3)
# 创建一个最大计数为 3 的新迭代器

for num in my_iterator_for:
    # for 循环开始,首先调用 my_iterator_for.__iter__() 获取迭代器(就是 my_iterator_for 自己)
    # 然后不断调用 my_iterator_for.__next__()
    print(num)
    # 打印从 __next__ 返回的每一个元素

# 输出:
# Inside __iter__
# Inside __next__, current_count: 0
# 0
# Inside __next__, current_count: 1
# 1
# Inside __next__, current_count: 2
# 2
# Inside __next__, current_count: 3
# Inside __next__, raising StopIteration
# for 循环捕获 StopIteration 异常并退出

这个例子展示了如何手动实现一个遵循迭代器协议的类。理解 __iter____next__ 是理解 Python 中迭代工作原理的基础。所有内置的可迭代对象(如列表、元组、字符串、字典、集合)都实现了 __iter__ 方法,该方法返回一个对应的迭代器对象。

3.2 可迭代对象 vs 迭代器

区分可迭代对象(Iterable)和迭代器(Iterator)非常重要:

可迭代对象 (Iterable): 实现了 __iter__() 方法的对象,该方法返回一个迭代器。列表、元组、字符串、字典等都是可迭代对象。你可以多次对同一个可迭代对象调用 iter() 或在多个 for 循环中使用它,每次都会得到一个新的迭代器实例。
迭代器 (Iterator): 实现了 __iter__()__next__() 方法的对象。迭代器维护着当前遍历的状态。一旦迭代器被耗尽(即 __next__ 抛出 StopIteration),它通常不能被重置或再次使用。迭代器本身也是可迭代对象,因为它们实现了 __iter__ 方法(返回自身)。

# 演示可迭代对象和迭代器
my_list = [1, 2, 3, 4]
# 这是一个列表,是可迭代对象

# 列表是可迭代对象,因为它有 __iter__ 方法
print(hasattr(my_list, '__iter__'))
# 输出:True

# 调用 iter() 函数获取列表的迭代器
list_iterator1 = iter(my_list)
# 获取列表 my_list 的第一个迭代器

# 迭代器是迭代器,因为它有 __next__ 方法
print(hasattr(list_iterator1, '__next__'))
# 输出:True

# 使用第一个迭代器
print(next(list_iterator1))
# 输出:1
print(next(list_iterator1))
# 输出:2

# 再次对同一个列表调用 iter(),获取一个新的迭代器
list_iterator2 = iter(my_list)
# 获取列表 my_list 的第二个(全新的)迭代器

# 使用第二个迭代器
print(next(list_iterator2))
# 输出:1
# 注意,第二个迭代器从头开始

# 继续使用第一个迭代器
print(next(list_iterator1))
# 输出:3
# 第一个迭代器保持了它自己的状态

# 迭代器本身也是可迭代对象(因为实现了 __iter__ 方法,返回自身)
print(hasattr(list_iterator1, '__iter__'))
# 输出:True
print(iter(list_iterator1) is list_iterator1)
# 输出:True
# 对迭代器调用 iter() 返回它自己

# for 循环的内部机制:
# for element in my_list:
#   # 1. 调用 iter(my_list) 获取一个迭代器 (list_iterator = iter(my_list))
#   # 2. 进入一个循环
#   # 3. 调用 next(list_iterator) 获取下一个元素
#   # 4. 如果 next() 返回一个值,将其赋给 element,并执行循环体
#   # 5. 如果 next() 抛出 StopIteration,退出循环

理解可迭代对象和迭代器的区别以及 for 循环的内部工作原理,有助于我们更有效地使用迭代器,并理解生成器的原理。

3.3 生成器 (Generators)

生成器是一种特殊类型的迭代器。创建生成器的最简单方法是使用函数和 yield 关键字。当函数中包含 yield 语句时,这个函数就变成了一个生成器函数。调用生成器函数不会立即执行函数体,而是返回一个生成器对象。

每次在生成器函数中遇到 yield 语句时,函数会暂停执行,并返回 yield 后面的值。下一次调用生成器的 __next__ 方法时,函数会从上次暂停的地方继续执行,直到遇到下一个 yield 或函数结束。

# 定义一个简单的生成器函数
def my_generator():
    # 这是一个生成器函数,包含 yield 语句
    print("Generator started...")
    # 这行代码在第一次调用 next() 时执行
    yield 1
    # yield 关键字暂停函数执行,并返回 1
    print("After first yield...")
    # 这行代码在第二次调用 next() 时执行
    yield 2
    # 再次暂停并返回 2
    print("After second yield...")
    # 这行代码在第三次调用 next() 时执行
    yield 3
    # 第三次暂停并返回 3
    print("Generator finished.")
    # 当函数执行到这里结束时,会自动抛出 StopIteration

# 调用生成器函数,获取生成器对象
gen = my_generator()
# 调用 my_generator() 不会打印 "Generator started...",而是返回一个生成器对象

# 生成器对象是迭代器,实现了 __iter__ 和 __next__
print(hasattr(gen, '__iter__'))
# 输出:True
print(hasattr(gen, '__next__'))
# 输出:True

# 使用 next() 函数驱动生成器执行
print(next(gen))
# 输出:
# Generator started...
# 1
# 第一次调用 next(),函数从开始执行到第一个 yield,返回 1

print(next(gen))
# 输出:
# After first yield...
# 2
# 第二次调用 next(),函数从第一个 yield 之后继续执行到第二个 yield,返回 2

print(next(gen))
# 输出:
# After second yield...
# 3
# 第三次调用 next(),函数从第二个 yield 之后继续执行到第三个 yield,返回 3

# 当再次调用 next() 时,函数会执行到结束,并抛出 StopIteration
# try:
#     next(gen)
# except StopIteration:
#     print("Generator is exhausted.")
# # 输出:
# # Generator finished.
# # Generator is exhausted.

# 通常使用 for 循环来消费生成器
print("
Using for loop with generator:")
# 再次调用生成器函数获取一个新的生成器对象
gen_for = my_generator()
# 获取一个新的生成器对象

for value in gen_for:
    # for 循环会自动处理 StopIteration
    print(value)
    # 打印 yield 返回的值

# 输出:
# Using for loop with generator:
# Generator started...
# 1
# After first yield...
# 2
# After second yield...
# 3
# Generator finished.

生成器函数通过 yield 关键字提供了一种更简洁、更易读的方式来实现迭代器。它们的优势在于按需生成数据,特别适用于处理大型或无限序列。

3.4 生成器表达式 (Generator Expressions)

类似于列表推导式,Python也提供了生成器表达式,用于创建简单的生成器。生成器表达式使用圆括号 () 而不是方括号 []

# 列表推导式:一次性生成所有元素并存储在列表中
list_comp = [x * x for x in range(5)]
# 生成一个列表 [0, 1, 4, 9, 16],占用内存

print(list_comp)
# 输出:[0, 1, 4, 9, 16]

# 生成器表达式:按需生成元素,不一次性生成所有
gen_exp = (x * x for x in range(5))
# 创建一个生成器对象,不会立即计算所有平方值

print(gen_exp)
# 输出:<generator object <genexpr> at 0x...>
# 打印的是生成器对象本身

# 使用 next() 消费生成器表达式
print(next(gen_exp))
# 输出:0
# 第一次调用 next() 时计算 0*0 并返回

print(next(gen_exp))
# 输出:1
# 第二次调用 next() 时计算 1*1 并返回

# 使用 for 循环消费生成器表达式(更常见)
print("
Using for loop with generator expression:")
# 再次创建一个生成器表达式,因为上面已经消费了部分
gen_exp_for = (x * x for x in range(5))
# 创建一个新的生成器表达式对象

for value in gen_exp_for:
    # for 循环消费生成器,按需获取元素
    print(value)

# 输出:
# Using for loop with generator expression:
# 0
# 1
# 4
# 9
# 16

# 生成器表达式特别适用于需要迭代但不需要将所有结果存储在内存中的场景
# 例如,处理大文件或网络流
# sum(x*x for x in range(1000000))
# 这会创建一个生成器表达式,按需生成 0 到 999999 的平方并求和,
# 而不会像列表推导式 [x*x for x in range(1000000)] 那样一次性创建包含一百万个元素的列表

生成器表达式是创建简单生成器的一种简洁语法,尤其适用于需要惰性计算(Lazy Evaluation)的场景。

3.5 生成器的高级用法:send(), throw(), close()

生成器不仅仅可以按需产生值,还可以与调用者进行双向通信。除了 next() 方法(相当于 send(None)),生成器对象还提供了 send(), throw(), 和 close() 方法。

send(value): 向生成器发送一个值。yield 表达式本身可以接收值。当调用 send(value) 时,yield 表达式会评估为 value,然后函数从 yield 后继续执行。
throw(type[, value[, traceback]]): 在生成器内部抛出一个异常。在生成器内部的 yieldnext() 调用处,会抛出指定的异常。
close(): 关闭生成器。在生成器内部,会在当前 yieldnext() 调用处抛出一个 GeneratorExit 异常。如果生成器捕获了这个异常(通常通过 try...finally 块),并且没有再次 yield 任何值,它就会正常退出。如果它 yield 了值或抛出了其他异常,会引发 RuntimeError

# 演示生成器的高级用法:send(), throw(), close()
def interactive_generator():
    # 这是一个交互式生成器,可以通过 send() 接收外部值
    print("Generator: Started...")
    # 生成器开始

    try:
        while True:
            # 进入一个循环,持续接收和处理值
            received_value = yield "Ready to receive"
            # yield 返回一个值给调用者 ("Ready to receive")
            # 并暂停执行,等待下一次 send() 或 next() 调用
            # 当 send() 调用发生时,send() 传入的值会被赋给 received_value
            print(f"Generator: Received {
              received_value}")
            # 打印接收到的值

            if received_value == "stop":
                # 如果接收到 "stop"
                print("Generator: Stopping...")
                break
                # 退出循环,生成器将结束

    except GeneratorExit:
        # 捕获 GeneratorExit 异常,当调用 close() 时会触发
        print("Generator: Caught GeneratorExit, cleaning up...")
        # 打印清理信息
    except ValueError as e:
        # 捕获 ValueError 异常,可以通过 throw() 触发
        print(f"Generator: Caught ValueError: {
              e}")
        # 打印捕获的异常信息
    finally:
        # finally 块确保在生成器退出前执行清理代码
        print("Generator: Exiting.")
        # 打印退出信息

# 获取生成器对象
gen = interactive_generator()

# 第一次调用 next() 启动生成器,执行到第一个 yield
print(next(gen))
# 输出:
# Generator: Started...
# Ready to receive
# next() 相当于 send(None),所以 received_value 在第一次 yield 后是 None

# 使用 send() 向生成器发送值
print(gen.send("Hello"))
# 输出:
# Generator: Received Hello
# Ready to receive
# send("Hello") 会将 "Hello" 发送给生成器,赋给 received_value,
# 然后生成器从 yield 处继续执行,再次遇到 yield "Ready to receive",暂停并返回 "Ready to receive"

print(gen.send("World"))
# 输出:
# Generator: Received World
# Ready to receive
# 再次发送 "World"

# 使用 throw() 在生成器内部抛出异常
try:
    gen.throw(ValueError, "Something went wrong")
    # 在生成器内部当前暂停处抛出 ValueError 异常
except StopIteration:
    # 如果 throw() 导致生成器退出(未被捕获或正常退出),会最终抛出 StopIteration
    print("Iteration stopped after exception.")
# 输出:
# Generator: Caught ValueError: Something went wrong
# Ready to receive
# 异常被生成器内部的 except ValueError 块捕获,然后生成器继续执行到下一个 yield

# 使用 send() 停止生成器
try:
    print(gen.send("stop"))
    # 发送 "stop" 信号
except StopIteration:
    print("Generator stopped successfully.")
# 输出:
# Generator: Received stop
# Generator: Stopping...
# Generator: Exiting.
# Generator stopped successfully.
# 接收到 "stop" 后,生成器退出循环,执行 finally 块,然后结束,自动抛出 StopIteration

# 再次获取一个新的生成器对象
gen2 = interactive_generator()
print(next(gen2))
# 输出:
# Generator: Started...
# Ready to receive

# 使用 close() 关闭生成器
gen2.close()
# 输出:
# Generator: Caught GeneratorExit, cleaning up...
# Generator: Exiting.
# 调用 close() 会在生成器内部抛出 GeneratorExit 异常,
# 如果生成器内部捕获并处理了这个异常并正常退出,就不会再抛出其他异常。
# 如果生成器在 GeneratorExit 异常处理中又 yield 或抛出其他异常,会引发 RuntimeError。

send(), throw(), 和 close() 方法使得生成器可以实现更复杂的协程(Coroutine)模式,允许生成器与调用者进行双向通信和控制。这在构建异步编程、管道处理或实现状态机时非常有用。

3.6 企业级应用:使用生成器处理大型日志文件

考虑一个企业场景:你需要分析一个巨大的服务器日志文件,文件大小可能超过你的可用内存。一次性读取整个文件到内存是不可行的。这时,生成器是理想的解决方案,它可以按行读取文件,按需处理每一行。

import os
import time

# 模拟一个大型日志文件
large_log_file_path = "large_server.log"

def create_large_log_file(filepath, num_lines=1000000):
    # 创建一个包含指定行数的大型日志文件
    print(f"Creating large log file: {
              filepath} with {
              num_lines} lines...")
    start_time = time.time()
    with open(filepath, "w") as f:
        # 以写入模式打开文件
        for i in range(num_lines):
            # 循环写入指定行数
            f.write(f"{
              time.strftime('%Y-%m-%d %H:%M:%S')} - INFO - Log entry {
              i+1}
")
            # 写入带时间戳、级别和序号的日志行
    end_time = time.time()
    print(f"Finished creating log file in {
              end_time - start_time:.2f} seconds.")
    # 打印文件创建耗时

# 创建模拟日志文件
create_large_log_file(large_log_file_path, num_lines=5000000) # 创建包含 500 万行的文件

# 使用生成器逐行读取大型文件
def read_large_file_by_line(filepath):
    # 一个生成器函数,用于逐行读取大型文件
    print(f"Reading file {
              filepath} line by line using generator...")
    try:
        with open(filepath, 'r') as f:
            # 以读取模式打开文件,with 语句确保文件被正确关闭
            for line in f:
                # for 循环在文件对象上迭代,文件对象本身就是一个行生成器
                yield line.rstrip('
') # 产生每一行,并移除行末的换行符
                # yield 暂停函数,返回一行数据,并等待下一次 next() 调用
    except FileNotFoundError:
        # 捕获文件未找到错误
        print(f"Error: File not found at {
              filepath}")
        # 打印错误信息
        return # 生成器结束,不会产生任何值
    # 文件会在 with 语句块结束时自动关闭

# 使用生成器处理日志文件
def process_log_entries(filepath, keyword=None):
    # 处理日志条目的函数,接受文件路径和可选的关键词
    processed_count = 0
    # 初始化处理计数器
    start_time = time.time()
    # 记录开始时间

    # 使用生成器获取日志行
    for log_entry in read_large_file_by_line(filepath):
        # for 循环消费 read_large_file_by_line 生成器,逐行获取日志
        # 每获取一行,生成器函数会暂停,直到下一次 for 循环迭代需要下一行
        # 内存中只保留当前处理的这一行数据
        if keyword is None or keyword in log_entry.lower():
            # 如果没有指定关键词,或者日志行包含关键词(不区分大小写)
            # 模拟处理日志条目 (这里只是打印)
            # print(f"Processing: {log_entry[:80]}...") # 只打印前80个字符避免输出过多
            processed_count += 1
            # 计数器加一

            # 在实际企业应用中,这里会进行复杂的日志解析、过滤、聚合或存储到数据库
            # 例如:parse_log_line(log_entry), analyze_data(parsed_entry), save_to_db(analyzed_data)

    end_time = time.time()
    # 记录结束时间
    print(f"
Finished processing {
              processed_count} log entries in {
              end_time - start_time:.2f} seconds.")
    # 打印处理完成信息和耗时

# 调用函数处理日志文件
process_log_entries(large_log_file_path)
# 处理所有日志条目

print("
--- Processing logs with keyword 'info' ---")
process_log_entries(large_log_file_path, keyword='info')
# 处理包含关键词 'info' 的日志条目

print("
--- Processing logs with non-existent keyword 'error' ---")
process_log_entries(large_log_file_path, keyword='error')
# 处理包含关键词 'error' 的日志条目 (模拟日志只包含 INFO 级别)

# 清理模拟文件
os.remove(large_log_file_path)
print(f"
Cleaned up {
              large_log_file_path}")
# 删除创建的日志文件

这个企业级示例展示了生成器在处理大型文件时的巨大优势。read_large_file_by_line 函数是一个生成器,它在每次 yield line 时暂停,将读取到的一行返回给调用者(process_log_entries 中的 for 循环)。当 for 循环需要下一行时,生成器从上次暂停的地方继续执行。这样,我们不需要一次性将整个大型文件读入内存,极大地降低了内存消耗,使得处理 TB 甚至 PB 级别的文件成为可能。


第四章:上下文管理器与 with 语句:优雅地管理资源

在编程中,管理资源(如文件、网络连接、锁、数据库连接等)是一个常见的任务。资源通常需要在使用后被正确地释放,以避免内存泄漏、死锁或其他问题。Python的上下文管理器(Context Managers)和 with 语句提供了一种简洁、安全的方式来自动管理资源的分配和释放。

4.1 with 语句和上下文管理器协议

with 语句用于包装代码块的执行,确保在代码块执行完毕(无论正常结束还是发生异常)后执行特定的清理操作。with 语句的工作原理依赖于上下文管理器协议。

上下文管理器协议包含两个核心方法:

__enter__(self): 进入 with 语句块时调用。可以在这里进行资源的分配。方法应该返回在 as 关键字后绑定的对象(通常是被管理的资源本身)。
__exit__(self, exc_type, exc_value, traceback): 退出 with 语句块时调用。可以在这里进行资源的清理和释放。如果 with 块中发生了异常,异常的类型、值和 traceback 会作为参数传递给 __exit__ 方法;如果没有发生异常,这三个参数都将是 None。如果 __exit__ 方法返回 True,则异常会被抑制(不会向上层传播);如果返回 FalseNone,异常会继续传播。

# 定义一个简单的自定义上下文管理器
class MyContextManager:
    # 这是一个自定义的上下文管理器类

    def __init__(self, resource_name):
        # 构造函数,初始化资源名称
        self.resource_name = resource_name
        # 存储资源的名称

    def __enter__(self):
        # 实现上下文管理器协议的 __enter__ 方法
        print(f"Entering context for {
              self.resource_name}")
        # 打印进入上下文的提示
        # 模拟资源分配
        self.resource = f"Allocated {
              self.resource_name} resource"
        # 分配一个模拟资源
        return self.resource
        # 返回分配的资源,这个资源会被绑定到 with 语句的 as 变量上

    def __exit__(self, exc_type, exc_value, traceback):
        # 实现上下文管理器协议的 __exit__ 方法
        # exc_type: 异常类型 (e.g., ValueError, TypeError),如果没有异常则为 None
        # exc_value: 异常实例,如果没有异常则为 None
        # traceback: traceback 对象,如果没有异常则为 None
        print(f"Exiting context for {
              self.resource_name}")
        # 打印退出上下文的提示
        # 模拟资源释放
        self.resource = None
        # 释放模拟资源
        print(f"Resource {
              self.resource_name} released.")
        # 打印资源释放提示

        if exc_type:
            # 检查 exc_type 是否不是 None,即是否发生了异常
            print(f"An exception occurred in the with block:")
            print(f"  Type: {
              exc_type.__name__}")
            # 打印异常类型名称
            print(f"  Value: {
              exc_value}")
            # 打印异常值
            # traceback 对象包含调用栈信息,通常在实际错误处理中会记录或分析
            # print(f"  Traceback: {traceback}")

            # 返回 False 或 None (默认行为) 会让异常继续传播
            # 返回 True 会抑制异常
            return False # 让异常继续传播
        else:
            # 如果没有发生异常
            print("No exception occurred in the with block.")
            # 打印没有异常的提示
            return False # 默认返回 False 或 None

# 使用 with 语句和自定义上下文管理器 (无异常)
print("--- Using with statement (no exception) ---")
with MyContextManager("database_connection") as db_conn:
    # 进入 with 语句,MyContextManager("database_connection").__enter__() 被调用
    # __enter__ 返回的值 "Allocated database_connection resource" 被赋给 db_conn
    print(f"Inside with block. Using resource: {
              db_conn}")
    # 打印在 with 块内部使用资源的情况
# with 块结束,MyContextManager("database_connection").__exit__(None, None, None) 被调用

# 输出:
# --- Using with statement (no exception) ---
# Entering context for database_connection
# Inside with block. Using resource: Allocated database_connection resource
# Exiting context for database_connection
# Resource database_connection released.
# No exception occurred in the with block.

print("
--- Using with statement (with exception) ---")
# 使用 with 语句和自定义上下文管理器 (有异常)
try:
    with MyContextManager("file_handle") as file:
        # 进入 with 语句,__enter__() 被调用
        print(f"Inside with block. Using resource: {
              file}")
        # 打印资源信息
        # 模拟发生异常
        raise ValueError("Something went wrong while processing file")
        # 在 with 块内部抛出一个 ValueError
        print("This line will not be executed.") # 异常后的代码不会执行
except ValueError as e:
    # 外部 try...except 块捕获 ValueError
    print(f"Caught exception outside with block: {
              e}")
    # 打印捕获的异常信息

# 输出:
# --- Using with statement (with exception) ---
# Entering context for file_handle
# Inside with block. Using resource: Allocated file_handle resource
# Exiting context for file_handle
# An exception occurred in the with block:
#   Type: ValueError
#   Value: Something went wrong while processing file
# Resource file_handle released.
# Caught exception outside with block: Something went wrong while processing file
# 注意:__exit__ 方法在异常发生后仍然被调用,并且异常继续向外传播(因为 __exit__ 返回 False)

print("
--- Using with statement (with exception, suppressed) ---")
# 定义一个会抑制异常的上下文管理器
class SuppressingContextManager:
    def __enter__(self):
        print("Entering suppressing context.")
        return "Suppressed Resource"

    def __exit__(self, exc_type, exc_value, traceback):
        print("Exiting suppressing context.")
        if exc_type:
            print(f"Exception caught and suppressed: {
              exc_type.__name__}: {
              exc_value}")
            return True # 返回 True 抑制异常
        print("No exception in suppressing context.")
        return False

try:
    with SuppressingContextManager() as res:
        print(f"Inside suppressing with block. Resource: {
              res}")
        raise RuntimeError("This exception will be suppressed")
        print("This line is also not executed.")
    print("This line after the suppressing with block is executed.") # 异常被抑制,with 块后的代码会执行
except RuntimeError:
    print("Caught RuntimeError outside the suppressing with block.") # 这个 except 块不会被触发

# 输出:
# --- Using with statement (with exception, suppressed) ---
# Entering suppressing context.
# Inside suppressing with block. Resource: Suppressed Resource
# Exiting suppressing context.
# Exception caught and suppressed: RuntimeError: This exception will be suppressed
# This line after the suppressing with block is executed.
# 注意:异常在 __exit__ 中被捕获并处理(打印信息),然后因为返回 True 而被抑制,不会继续向外传播。
# with 语句后的代码正常执行。

通过实现 __enter____exit__ 方法,任何对象都可以成为上下文管理器。with 语句确保 __exit__ 方法总会被调用,无论 with 块是否发生异常,这为资源管理提供了一个非常健壮的机制。

4.2 使用 contextlib 模块

Python的 contextlib 模块提供了更方便的方式来创建上下文管理器,尤其是当你不想定义一个完整的类时。常用的工具包括 @contextmanager 装饰器和 closing 函数。

4.2.1 @contextmanager 装饰器

@contextmanager 装饰器允许你使用一个生成器函数来创建上下文管理器。生成器函数在 yield 语句之前执行 __enter__ 的逻辑(资源分配),yield 后面的值作为 __enter__ 的返回值绑定到 as 变量,而 yield 语句之后(直到函数结束或再次 yield)的代码则作为 __exit__ 的逻辑(资源清理)。

import contextlib
import time

# 使用 @contextmanager 装饰器创建上下文管理器
@contextlib.contextmanager
def timer(name):
    # 这是一个使用生成器创建的计时器上下文管理器
    # contextlib.contextmanager 装饰器将这个生成器函数转换为上下文管理器

    start_time = time.time()
    # 在 yield 之前执行的代码,相当于 __enter__ 的逻辑
    # 记录开始时间
    print(f"[{
              name}] Starting timer...")
    # 打印启动信息

    try:
        yield # yield 处是 with 块的开始,没有返回资源 (可以 yield 一个值作为返回资源)
        # yield 暂停函数执行,并将控制权交给 with 语句块
        # 如果 with 块有 as 变量, yield 后面可以跟一个值,那个值会被赋给 as 变量
        # 在这里,yield 没有返回值,所以 as variable will be None if used

    except Exception as e:
        # 捕获在 with 块中发生的任何异常
        print(f"[{
              name}] Timer caught exception: {
              e}")
        # 打印捕获到的异常信息
        # 如果在这里不 re-raise 异常,异常会被抑制 (相当于 __exit__ 返回 True)
        raise # 重新抛出异常,让异常继续传播 (相当于 __exit__ 返回 False)
        # 如果不加 raise,异常会被装饰器自动抑制

    finally:
        # yield 之后的代码,包括 try...except...finally 块,相当于 __exit__ 的逻辑
        # 无论 with 块是否发生异常,finally 块总会被执行
        end_time = time.time()
        # 记录结束时间
        print(f"[{
              name}] Timer stopped. Duration: {
              end_time - start_time:.4f} seconds.")
        # 打印计时结束信息和持续时间

# 使用 @contextmanager 创建的上下文管理器
print("--- Using @contextmanager (no exception) ---")
with timer("Task A"):
    # 进入 timer("Task A") 上下文,生成器执行到 yield
    time.sleep(0.5) # 模拟任务执行 0.5 秒
    print("Inside Task A")
# with 块结束,生成器从 yield 后继续执行 finally 块

# 输出:
# --- Using @contextmanager (no exception) ---
# [Task A] Starting timer...
# Inside Task A
# [Task A] Timer stopped. Duration: 0.5xxx seconds.

print("
--- Using @contextmanager (with exception) ---")
try:
    with timer("Task B"):
        # 进入 timer("Task B") 上下文
        time.sleep(0.2) # 模拟任务执行 0.2 秒
        print("Inside Task B, about to raise error")
        raise TypeError("Simulated error in Task B")
        # 抛出一个异常
        print("This line in Task B will not be executed")
except TypeError as e:
    print(f"Caught exception outside timer: {
              e}")
# with 块发生异常,生成器从 yield 处恢复,异常会在 yield 表达式处抛出,
# 被 try...except 块捕获,然后执行 finally 块,最后异常被 re-raise 并被外部 except 捕获

# 输出:
# --- Using @contextmanager (with exception) ---
# [Task B] Starting timer...
# Inside Task B, about to raise error
# [Task B] Timer caught exception: Simulated error in Task B
# [Task B] Timer stopped. Duration: 0.2xxx seconds.
# Caught exception outside timer: Simulated error in Task B

@contextmanager 装饰器极大地简化了上下文管理器的创建,特别是对于那些资源分配和释放逻辑比较简单的场景。try...except...finally 块在 yield 之后是可选的,但通常用于确保资源在发生异常时也能被正确清理。

4.2.2 closing 函数

contextlib.closing(obj) 函数用于将一个具有 close() 方法的对象(例如文件对象、网络套接字)转换为一个上下文管理器。在 with 块退出时,它会自动调用对象的 close() 方法。

import contextlib
import urllib.request
# 导入 urllib.request 模块,用于打开 URL

# 使用 contextlib.closing 打开一个网络资源 (模拟)
# urllib.request.urlopen() 返回一个类似文件的对象,有 close() 方法
# 在企业级应用中,可能是数据库连接、消息队列连接等
class MockNetworkConnection:
    # 模拟一个具有 close() 方法的网络连接对象
    def __init__(self, url):
        self.url = url
        print(f"Opening connection to {
              self.url}...")
        self.is_closed = False

    def read(self):
        if self.is_closed:
            raise ValueError("Connection is closed")
        print("Reading from connection...")
        return f"Data from {
              self.url}"

    def close(self):
        if not self.is_closed:
            print(f"Closing connection to {
              self.url}.")
            self.is_closed = True

# 使用 closing 管理模拟的网络连接
print("--- Using closing to manage connection ---")
try:
    # 创建模拟连接对象
    connection = MockNetworkConnection("http://example.com")
    # 使用 closing 将其转换为上下文管理器
    with contextlib.closing(connection) as conn:
        # 进入 with 块,__enter__ 返回 connection 对象
        print("Inside with block, connection is open.")
        data = conn.read() # 使用连接读取数据
        print(f"Read data: {
              data}")
        # 模拟一个操作,不发生异常
        # if True: # Change to False to simulate no exception
        #     raise ConnectionAbortedError("Simulating connection error")
            # 模拟发生异常

    # with 块退出,closing 的 __exit__ 会调用 connection.close()
    print("Outside with block.")
    # connection.read() # 尝试再次读取会因为连接已关闭而报错
except ConnectionAbortedError as e:
    print(f"Caught expected error: {
              e}")
    # 外部捕获异常
finally:
    # 即使发生了异常,closing 也保证 close() 被调用
    # 可以在这里再次检查 connection 是否已关闭 (如果 MockNetworkConnection 支持)
    print("Finally block executed.")


# 实际应用示例:使用 closing 管理文件对象 (虽然 open() 本身就是上下文管理器,但 closing 也可以用)
print("
--- Using closing with a file object ---")
file_path = "my_temp_file.txt"
with open(file_path, "w") as f:
    f.write("Hello, world!")

# 现在以读取模式打开,并使用 closing
try:
    f_obj = open(file_path, "r") # 获取文件对象
    with contextlib.closing(f_obj) as file:
        # 使用 closing 管理文件对象
        content = file.read()
        print(f"File content: {
              content}")
        # 模拟异常
        # raise IOError("Simulating file error")
    # with 块退出,closing 确保 f_obj.close() 被调用
    print("File is closed by closing().")
except IOError as e:
    print(f"Caught expected file error: {
              e}")
finally:
    if os.path.exists(file_path):
        os.remove(file_path)
        print(f"Cleaned up {
              file_path}")

# 输出:
# --- Using closing to manage connection ---
# Opening connection to http://example.com...
# Inside with block, connection is open.
# Reading from connection...
# Read data: Data from http://example.com
# Closing connection to http://example.com.
# Outside with block.
# Finally block executed.
#
# --- Using closing with a file object ---
# File content: Hello, world!
# File is closed by closing().
# Cleaned up my_temp_file.txt

contextlib.closing 是一个便捷的工具,适用于任何具有 close() 方法的对象,可以确保在 with 块结束时自动调用 close() 进行资源清理。

4.3 企业级应用:数据库连接池的上下文管理

在企业级应用中,频繁地打开和关闭数据库连接效率低下。通常会使用连接池。连接池管理连接的生命周期,当需要使用连接时从池中获取,使用完毕后归还连接到池中(而不是关闭)。上下文管理器非常适合管理从连接池获取的连接。

import threading
import time
import queue # 导入 queue 模块用于实现连接池

# 模拟一个数据库连接对象
class DatabaseConnection:
    # 模拟数据库连接类
    def __init__(self, conn_id):
        # 构造函数,初始化连接 ID
        self.conn_id = conn_id
        self.is_closed = False
        self.is_in_use = False
        print(f"Connection {
              self.conn_id} created.")
        # 打印连接创建信息

    def execute_query(self, query):
        # 模拟执行查询的方法
        if self.is_closed or not self.is_in_use:
            raise ValueError("Connection is not available")
        print(f"Connection {
              self.conn_id}: Executing query '{
              query}'...")
        time.sleep(0.1) # 模拟查询耗时
        return f"Result for '{
              query}' from conn {
              self.conn_id}"

    def close(self):
        # 模拟关闭连接的方法 (在连接池场景下通常是归还到池中)
        if not self.is_closed:
            print(f"Connection {
              self.conn_id} truly closed (e.g., pool shutdown).")
            self.is_closed = True
            self.is_in_use = False

    def release(self):
        # 在连接池场景下,这是将连接归还到池中的方法
        if self.is_in_use and not self.is_closed:
            print(f"Connection {
              self.conn_id} released back to pool.")
            self.is_in_use = False
            # 在实际池中,会将 self 放回 queue 中
        elif self.is_closed:
             print(f"Attempted to release closed connection {
              self.conn_id}.")


# 模拟一个简单的数据库连接池
class ConnectionPool:
    # 模拟连接池类
    def __init__(self, max_connections):
        # 构造函数,初始化最大连接数和连接队列
        self.max_connections = max_connections
        # 设置连接池的最大连接数
        self._connections = queue.Queue(maxsize=max_connections)
        # 使用 queue.Queue 作为连接池的存储,设定最大容量
        for i in range(max_connections):
            # 初始化连接池,创建指定数量的连接并放入队列
            conn = DatabaseConnection(i + 1)
            self._connections.put(conn)
            # 将创建的连接放入队列

    def get_connection(self):
        # 从连接池获取一个连接的方法
        print("Getting connection from pool...")
        try:
            # 从队列中获取一个连接,会阻塞直到有可用连接
            conn = self._connections.get(timeout=2) # 设置超时,避免无限等待
            # 获取到一个连接对象
            if conn.is_closed:
                 # 检查获取到的连接是否已经关闭 (理论上不应该发生,除非池管理有问题)
                 print(f"Warning: Got a closed connection {
              conn.conn_id}. Trying again.")
                 return self.get_connection() # 尝试再次获取
            conn.is_in_use = True
            # 标记连接为使用中
            print(f"Connection {
              conn.conn_id} acquired.")
            # 打印获取到的连接 ID
            return ConnectionContext(conn, self)
            # 返回一个 ConnectionContext 对象,它是上下文管理器
        except queue.Empty:
            # 如果超时仍然没有可用连接
            raise RuntimeError("Connection pool exhausted or timed out.")
            # 抛出连接池耗尽错误

    def return_connection(self, conn):
        # 将连接归还到连接池的方法
        if conn and not conn.is_closed:
            # 只有连接有效且未关闭时才归还
            conn.is_in_use = False
            # 标记连接为非使用中
            self._connections.put(conn)
            # 将连接放回队列

    def shutdown(self):
        # 关闭连接池,真正关闭所有连接
        print("Shutting down connection pool...")
        while not self._connections.empty():
            conn = self._connections.get_nowait()
            conn.close()
        print("Connection pool shutdown complete.")

# 定义一个上下文管理器类,用于管理从连接池获取的连接
class ConnectionContext:
    # 连接上下文管理器类
    def __init__(self, connection, pool):
        # 构造函数,接收具体的连接对象和连接池对象
        self._connection = connection
        self._pool = pool

    def __enter__(self):
        # 进入 with 块时调用
        # get_connection 方法已经标记连接为使用中并返回了自身(ConnectionContext 实例)
        # __enter__ 方法需要返回实际的连接对象,以便在 with 块中使用它执行查询等操作
        return self._connection
        # 返回具体的数据库连接对象

    def __exit__(self, exc_type, exc_value, traceback):
        # 退出 with 块时调用
        if self._connection:
            # 确保连接对象存在
            self._pool.return_connection(self._connection)
            # 将连接归还给连接池
            self._connection = None # 清除引用,防止误用

        # __exit__ 可以选择是否抑制异常,这里我们不抑制,让异常继续传播
        return False # 让异常继续传播

# 创建一个连接池,最大连接数为 3
db_pool = ConnectionPool(max_connections=3)

# 模拟在多个地方使用连接池获取连接执行查询
def worker_task(task_id):
    # 模拟一个工作线程或任务
    print(f"Task {
              task_id}: Requesting connection...")
    try:
        # 使用 with 语句从连接池获取连接,并确保使用后归还
        with db_pool.get_connection() as conn:
            # get_connection() 返回 ConnectionContext 实例,其 __enter__ 返回 DatabaseConnection 实例并赋给 conn
            print(f"Task {
              task_id}: Got connection {
              conn.conn_id}.")
            # 打印获取到的连接 ID

            # 模拟执行一些查询
            result1 = conn.execute_query(f"SELECT data for task {
              task_id}-1")
            print(f"Task {
              task_id}: Result 1: {
              result1}")

            # 模拟一个可能出错的操作
            if task_id == 2:
                 print(f"Task {
              task_id}: Simulating error...")
                 raise RuntimeError(f"Error processing task {
              task_id}")

            result2 = conn.execute_query(f"UPDATE status for task {
              task_id}-2")
            print(f"Task {
              task_id}: Result 2: {
              result2}")

        # with 块结束,ConnectionContext 的 __exit__ 方法被调用,将连接归还到池中
        print(f"Task {
              task_id}: Connection released.")

    except Exception as e:
        # 捕获任务执行过程中的异常
        print(f"Task {
              task_id}: Caught exception: {
              e}")
    finally:
        # 确保任务结束
        print(f"Task {
              task_id}: Finished.")


# 使用线程模拟并发访问连接池
threads = []
for i in range(5): # 启动 5 个任务,超过连接池容量 (3)
    t = threading.Thread(target=worker_task, args=(i + 1,))
    threads.append(t)
    t.start()
    time.sleep(0.05) # 稍微错开启动时间,更容易看到连接获取和等待的情况

# 等待所有线程完成
for t in threads:
    t.join()

# 关闭连接池 (在应用关闭时调用)
db_pool.shutdown()

# 输出示例 (顺序可能因线程调度而异):
# Connection 1 created.
# Connection 2 created.
# Connection 3 created.
# Task 1: Requesting connection...
# Getting connection from pool...
# Connection 1 acquired.
# Task 1: Got connection 1.
# Task 1: Connection 1: Executing query 'SELECT data for task 1-1'...
# Task 2: Requesting connection...
# Getting connection from pool...
# Connection 2 acquired.
# Task 2: Got connection 2.
# Task 2: Connection 2: Executing query 'SELECT data for task 2-1'...
# Task 3: Requesting connection...
# Getting connection from pool...
# Connection 3 acquired.
# Task 3: Got connection 3.
# Task 3: Connection 3: Executing query 'SELECT data for task 3-1'...
# Task 4: Requesting connection...
# Getting connection from pool... (可能会在这里等待)
# Task 5: Requesting connection...
# Getting connection from pool... (可能会在这里等待)
# ... 任务执行和归还连接 ...
# Task 1: Result 1: Result for 'SELECT data for task 1-1' from conn 1
# Task 1: Connection 1: Executing query 'UPDATE status for task 1-2'...
# Task 2: Result 1: Result for 'SELECT data for task 2-1' from conn 2
# Task 2: Got connection 2.
# Task 2: Simulating error...
# Task 2: Caught exception: Error processing task 2
# Task 2: Finished.
# Connection 2 released back to pool.
# Task 4: Got connection 2. (Task 4 获取到 Task 2 归还的连接)
# Task 4: Connection 2: Executing query 'SELECT data for task 4-1'...
# Task 1: Result 2: Result for 'UPDATE status for task 1-2' from conn 1
# Task 1: Connection released.
# Task 1: Finished.
# Connection 1 released back to pool.
# Task 5: Got connection 1. (Task 5 获取到 Task 1 归还的连接)
# Task 5: Connection 1: Executing query 'SELECT data for task 5-1'...
# ...
# Shutting down connection pool...
# Connection 1 truly closed (e.g., pool shutdown).
# Connection 2 truly closed (e.g., pool shutdown).
# Connection 3 truly closed (e.g., pool shutdown).
# Connection pool shutdown complete.

这个企业级示例展示了如何将上下文管理器应用于数据库连接池的管理。ConnectionContext 类作为上下文管理器,在 __enter__ 中返回实际的数据库连接对象供 with 块使用,并在 __exit__ 中确保连接被正确地归还到连接池中,无论 with 块是正常退出还是发生异常。这极大地简化了连接管理的代码,避免了手动获取和释放连接时容易出错的问题,是企业级应用中管理共享资源(如数据库连接、线程锁等)的标准模式。

第五章:并发与并行:充分利用计算资源

在现代计算环境中,程序常常需要同时处理多个任务。这可以是等待网络响应、读写文件等 I/O 密集型任务,也可以是复杂的计算、数据处理等 CPU 密集型任务。为了提高程序的效率和响应性,我们可以采用并发(Concurrency)和并行(Parallelism)技术。

并发 (Concurrency): 指的是同时处理多个任务的能力。在单核CPU上,并发是通过快速切换任务来实现的(时间片轮转),看起来像是同时进行,但实际上在任何一个瞬间只有一个任务在执行。Python的线程(Threading)通常用于实现并发,尤其适合I/O密集型任务。
并行 (Parallelism): 指的是同时执行多个任务的能力。这通常需要多核CPU,不同的任务在不同的CPU核心上真正地同时执行。Python的进程(Multiprocessing)通常用于实现并行,适合CPU密集型任务。

5.1 线程 (Threading):I/O 密集型任务的利器

线程是进程内的执行单元。一个进程可以包含多个线程,这些线程共享进程的内存空间。在Python中,可以使用 threading 模块来创建和管理线程。线程适用于 I/O 密集型任务,因为当一个线程在等待 I/O 完成时,其他线程可以在同一个 CPU 核心上继续执行,从而提高整体效率。

5.1.1 创建和运行线程

可以使用 threading.Thread 类来创建线程。创建线程时,需要指定线程要执行的目标函数以及传递给函数的参数。

import threading
import time
# 导入 threading 模块用于多线程
# 导入 time 模块用于模拟耗时操作

# 定义一个线程要执行的函数
def worker_function(name, duration):
    # 这是线程将要执行的任务函数
    # name: 线程的名称
    # duration: 模拟任务执行的时间长度(秒)
    print(f"Thread {
              name}: Starting task for {
              duration} seconds.")
    # 打印线程开始执行的提示信息
    time.sleep(duration)
    # 模拟耗时的任务执行,暂停指定的 duration 秒
    print(f"Thread {
              name}: Task finished.")
    # 打印线程任务完成的提示信息

# --- 创建和运行线程示例 ---

# 创建第一个线程
thread1 = threading.Thread(target=worker_function, args=("Thread 1", 2))
# 使用 threading.Thread 类创建一个线程对象
# target 参数指定线程要执行的函数 worker_function
# args 参数是一个元组,包含传递给 target 函数的位置参数 ("Thread 1", 2)

# 创建第二个线程
thread2 = threading.Thread(target=worker_function, args=("Thread 2", 3))
# 创建第二个线程对象

# 启动线程
# 启动线程会调用 target 函数在一个新的线程中执行
thread1.start()
# 启动第一个线程
thread2.start()
# 启动第二个线程

print("Main thread: Started threads.")
# 主线程在启动子线程后会继续执行

# 等待所有线程完成
# join() 方法会阻塞当前线程 (主线程) 直到调用的线程 (thread1 或 thread2) 执行完毕
thread1.join()
# 等待 thread1 执行完毕
thread2.join()
# 等待 thread2 执行完毕

print("Main thread: All threads finished.")
# 所有子线程都执行完毕后,主线程才会执行到这里

# 输出示例:
# Main thread: Started threads.
# Thread Thread 1: Starting task for 2 seconds.
# Thread Thread 2: Starting task for 3 seconds.
# Thread Thread 1: Task finished.
# Thread Thread 2: Task finished.
# Main thread: All threads finished.
# 注意:由于是并发执行,"Thread 1: Task finished." 会在 "Thread 2: Task finished." 之前打印,因为它只需要等待 2 秒。
# 主线程的输出 "Main thread: Started threads." 会在子线程的 "Starting task..." 之前打印,因为它只负责启动,不等待。

这个例子演示了如何创建和启动两个独立的线程来执行 worker_function。主线程在启动子线程后立即继续执行,通过 join() 方法等待子线程完成。

5.1.2 线程同步:避免竞态条件 (Race Conditions)

当多个线程访问和修改共享资源时,可能会发生竞态条件(Race Conditions),导致不可预测的结果。为了避免这种情况,需要使用线程同步机制来控制对共享资源的访问。

Python threading 模块提供了多种同步工具:

锁 (Lock): 最基本的同步原语。一个锁有两种状态:锁定 (locked) 和 未锁定 (unlocked)。一个线程在访问共享资源前尝试获取锁。如果锁是未锁定的,线程获取锁并将其状态设置为锁定。如果锁已经被锁定,线程会阻塞,直到锁被释放。使用完共享资源后,线程必须释放锁。
可重入锁 (RLock): 允许同一个线程多次获取同一个锁,而不会死锁。只有当拥有锁的线程释放了相同次数后,锁才会被完全释放供其他线程获取。RLock 适用于递归函数或在同一个线程中多次需要获取同一个锁的场景。
信号量 (Semaphore): 用于控制对有限资源的访问。信号量维护一个内部计数器,表示可用资源的数量。线程在访问资源前尝试获取信号量,如果计数器大于零,则计数器减一并允许线程访问;如果计数器为零,则线程阻塞。使用完资源后,线程释放信号量,计数器加一。
事件 (Event): 用于线程间的简单通信。一个事件对象维护一个内部标志,可以设置为 True 或 False。线程可以等待事件标志变为 True,或者设置/清除标志来通知其他线程。
条件变量 (Condition): 在锁的基础上,允许线程在某个条件不满足时等待,并在条件满足时被其他线程通知唤醒。通常用于更复杂的线程间协作场景。

企业级代码示例:使用锁保护共享资源

考虑一个企业应用场景:多个线程需要更新一个共享的计数器或数据库记录。不加锁会导致更新丢失或数据不一致。

import threading
import time

# 定义共享资源
shared_counter = 0
# 全局共享计数器

# 创建一个 Lock 对象
counter_lock = threading.Lock()
# 创建一个标准锁,用于保护 shared_counter

# 定义一个线程函数,用于增加计数器
def increment_counter(num_increments):
    # 线程函数,用于多次增加共享计数器
    global shared_counter # 声明使用全局变量 shared_counter
    thread_name = threading.current_thread().name # 获取当前线程的名称

    print(f"Thread {
              thread_name}: Starting to increment counter {
              num_increments} times.")
    # 打印线程开始信息

    for i in range(num_increments):
        # 循环指定的次数

        # 在访问共享资源前获取锁
        # counter_lock.acquire()
        # acquire() 方法会阻塞直到成功获取锁
        # 使用 with 语句管理锁更推荐,可以确保锁在退出 with 块时自动释放
        with counter_lock:
            # 使用 with counter_lock: 语法获取锁
            # 当进入 with 块时,锁被获取
            # 无论 with 块内部发生什么 (正常结束或异常),退出时锁都会被释放 (通过 __exit__ 方法)

            # 访问和修改共享资源
            current_value = shared_counter
            # 读取当前计数器的值
            time.sleep(0.0001) # 模拟在读取和写入之间发生上下文切换
            # 短暂暂停,增加发生竞态条件的机会(如果没有锁)
            shared_counter = current_value + 1
            # 将计数器加一

            # print(f"Thread {thread_name}: Increment {i+1}, counter is now {shared_counter}")
            # 可以打印每次增量后的值(如果需要详细调试)

        # 退出 with 块,锁自动释放
        # counter_lock.release() # 如果使用 acquire(),需要在 finally 块中调用 release() 以确保释放

    print(f"Thread {
              thread_name}: Finished incrementing.")
    # 打印线程完成信息

# --- 模拟竞态条件 (不使用锁) ---
print("--- Simulating Race Condition (without lock) ---")
shared_counter = 0 # 重置计数器
# 定义一个不使用锁的增量函数,用于对比
def increment_counter_no_lock(num_increments):
    global shared_counter
    thread_name = threading.current_thread().name
    # print(f"Thread {thread_name}: Starting without lock.")
    for i in range(num_increments):
        current_value = shared_counter
        time.sleep(0.0001) # 增加竞态条件机会
        shared_counter = current_value + 1
    # print(f"Thread {thread_name}: Finished without lock.")

num_threads = 10
increments_per_thread = 10000
total_expected = num_threads * increments_per_thread

threads_no_lock = []
for i in range(num_threads):
    t = threading.Thread(target=increment_counter_no_lock, args=(increments_per_thread,))
    threads_no_lock.append(t)
    t.start()

for t in threads_no_lock:
    t.join()

print(f"Final counter value (without lock): {
              shared_counter}")
print(f"Expected counter value: {
              total_expected}")
print(f"Is counter value correct? {
              shared_counter == total_expected}")
# 输出通常会小于 total_expected,表明发生了竞态条件,部分更新丢失

# --- 使用锁解决竞态条件 ---
print("
--- Resolving Race Condition (with lock) ---")
shared_counter = 0 # 重置计数器

threads_with_lock = []
for i in range(num_threads):
    # 注意:这里的 target 是使用了锁的 increment_counter 函数
    t = threading.Thread(target=increment_counter, args=(increments_per_thread,))
    threads_with_lock.append(t)
    t.start()

for t in threads_with_lock:
    t.join()

print(f"Final counter value (with lock): {
              shared_counter}")
print(f"Expected counter value: {
              total_expected}")
print(f"Is counter value correct? {
              shared_counter == total_expected}")
# 输出应该等于 total_expected,表明锁成功保护了共享资源

这个示例清晰地展示了竞态条件的存在以及如何使用 threading.Lock(通过 with 语句)来保护共享的全局计数器,确保每次对计数器的修改都是原子性的(不可中断),从而得到正确的结果。with counter_lock: 语法是管理锁的推荐方式,因为它简洁且能保证无论是否发生异常,锁都能被正确释放。

5.1.3 死锁 (Deadlocks)

死锁是多线程编程中一个常见的问题。当两个或多个线程在等待彼此释放锁时,就会发生死锁,导致所有相关线程都无法继续执行。

一个典型的死锁场景:

线程 A 获取了锁 L1。
线程 B 获取了锁 L2。
线程 A 尝试获取锁 L2(L2 目前被 B 锁定)。
线程 B 尝试获取锁 L1(L1 目前被 A 锁定)。

结果是:A 等待 B 释放 L2,B 等待 A 释放 L1,它们互相等待,永远无法继续。

代码示例:演示死锁

import threading
import time

# 创建两个锁
lock1 = threading.Lock()
lock2 = threading.Lock()

# 定义两个线程函数,它们试图以不同的顺序获取锁
def thread1_task():
    # 线程 1 的任务
    print("Thread 1: Attempting to acquire Lock 1...")
    lock1.acquire() # 获取 Lock 1
    print("Thread 1: Acquired Lock 1.")
    time.sleep(0.1) # 模拟一些工作,同时持有 Lock 1

    print("Thread 1: Attempting to acquire Lock 2...")
    lock2.acquire() # 尝试获取 Lock 2
    print("Thread 1: Acquired Lock 2.")

    print("Thread 1: Both locks acquired, doing work...")
    # 在获取两个锁后执行任务
    time.sleep(0.5)

    print("Thread 1: Releasing Lock 2.")
    lock2.release() # 释放 Lock 2
    print("Thread 1: Releasing Lock 1.")
    lock1.release() # 释放 Lock 1
    print("Thread 1: Finished.")

def thread2_task():
    # 线程 2 的任务
    print("Thread 2: Attempting to acquire Lock 2...")
    lock2.acquire() # 获取 Lock 2
    print("Thread 2: Acquired Lock 2.")
    time.sleep(0.1) # 模拟一些工作,同时持有 Lock 2

    print("Thread 2: Attempting to acquire Lock 1...")
    lock1.acquire() # 尝试获取 Lock 1
    print("Thread 2: Acquired Lock 1.")

    print("Thread 2: Both locks acquired, doing work...")
    # 在获取两个锁后执行任务
    time.sleep(0.5)

    print("Thread 2: Releasing Lock 1.")
    lock1.release() # 释放 Lock 1
    print("Thread 2: Releasing Lock 2.")
    lock2.release() # 释放 Lock 2
    print("Thread 2: Finished.")

# 创建并启动两个线程
thread1 = threading.Thread(target=thread1_task)
thread2 = threading.Thread(target=thread2_task)

print("Main thread: Starting threads that might deadlock.")
thread1.start()
thread2.start()

# 等待线程完成
# 在发生死锁的情况下,join() 将会永远阻塞
# 为了演示死锁,我们通常不会让主线程无限等待,而是设置一个超时或手动中断
thread1.join() # 这行可能会因为死锁而阻塞
thread2.join() # 这行也可能会因为死锁而阻塞

print("Main thread: Threads finished (or are deadlocked).")

# 输出示例 (可能会出现死锁):
# Main thread: Starting threads that might deadlock.
# Thread 1: Attempting to acquire Lock 1...
# Thread 1: Acquired Lock 1.
# Thread 2: Attempting to acquire Lock 2...
# Thread 2: Acquired Lock 2.
# Thread 1: Attempting to acquire Lock 2... (阻塞,等待 Thread 2 释放 Lock 2)
# Thread 2: Attempting to acquire Lock 1... (阻塞,等待 Thread 1 释放 Lock 1)
# (程序在此处死锁,无法继续)

这个例子展示了当两个线程试图以不同顺序获取多个锁时,很容易导致死锁。线程 1 持有 Lock 1 并等待 Lock 2,而线程 2 持有 Lock 2 并等待 Lock 1。

避免死锁的策略:

规定锁的获取顺序: 确保所有线程以相同的固定顺序获取多个锁。如果线程 A 需要 L1 和 L2,线程 B 也需要 L1 和 L2,那么约定它们总是先获取 L1 再获取 L2。
使用可重入锁 (RLock): 如果同一个线程需要多次获取同一个锁,使用 RLock 可以避免自己与自己死锁。
设置锁的超时时间: 在获取锁时使用带有 timeout 参数的 acquire() 方法。如果获取锁超时,线程可以选择放弃或采取其他策略。
避免持有锁时执行耗时或可能阻塞的操作: 尽量缩小临界区(持有锁的代码块)的范围。
检测和恢复: 在复杂系统中,可能需要实现死锁检测和恢复机制(虽然这在Python中不常见)。

企业级应用:使用 RLock 处理嵌套锁场景

假设在一个复杂的对象模型中,一个方法可能需要获取对象的一个锁,而在该方法内部又调用了另一个方法,而这个内部方法也需要获取同一个对象的锁。如果使用标准的 Lock,就会发生死锁。RLock 正是为解决这种可重入场景而设计的。

import threading
import time

# 定义一个具有可重入锁的类
class SafeCounter:
    # 一个线程安全的计数器类,使用 RLock

    def __init__(self):
        # 构造函数
        self._value = 0
        # 计数器的内部值
        self._lock = threading.RLock()
        # 创建一个可重入锁 RLock

    def increment(self):
        # 外部调用方法,用于增加计数器
        print(f"Thread {
              threading.current_thread().name}: Attempting to acquire RLock in increment...")
        with self._lock:
            # 获取 RLock
            print(f"Thread {
              threading.current_thread().name}: Acquired RLock in increment.")
            self._value += 1
            # 增加计数器值
            print(f"Thread {
              threading.current_thread().name}: Counter value is now {
              self._value}")
            # 打印当前值
            self._internal_operation() # 调用内部方法,该方法也需要锁
        # 退出 with 块,第一次释放 RLock
        print(f"Thread {
              threading.current_thread().name}: Released RLock in increment.")

    def _internal_operation(self):
        # 内部方法,也需要获取同一个 RLock
        print(f"Thread {
              threading.current_thread().name}: Attempting to acquire RLock in _internal_operation...")
        with self._lock:
             # 再次获取 RLock
            print(f"Thread {
              threading.current_thread().name}: Acquired RLock in _internal_operation.")
            print(f"Thread {
              threading.current_thread().name}: Performing internal operation.")
            time.sleep(0.01) # 模拟内部操作
        # 退出 with 块,第二次释放 RLock
        print(f"Thread {
              threading.current_thread().name}: Released RLock in _internal_operation.")

    def get_value(self):
        # 获取计数器值的方法 (也需要锁以确保读取一致性)
        print(f"Thread {
              threading.current_thread().name}: Attempting to acquire RLock in get_value...")
        with self._lock:
            # 获取 RLock
            print(f"Thread {
              threading.current_thread().name}: Acquired RLock in get_value.")
            value = self._value
        # 退出 with 块,释放 RLock
        print(f"Thread {
              threading.current_thread().name}: Released RLock in get_value.")
        return value

# 定义一个线程任务,调用 SafeCounter 的方法
def counter_worker(counter_obj, num_calls):
    # 线程函数,调用 SafeCounter 的 increment 方法多次
    thread_name = threading.current_thread().name
    print(f"Thread {
              thread_name}: Starting.")
    for _ in range(num_calls):
        counter_obj.increment()
    print(f"Thread {
              thread_name}: Finished.")

# 创建 SafeCounter 实例
safe_counter = SafeCounter()

# 创建并启动多个线程
num_threads = 5
num_calls_per_thread = 2
threads = []
for i in range(num_threads):
    t = threading.Thread(target=counter_worker, args=(safe_counter, num_calls_per_thread), name=f"Worker-{
              i+1}")
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

print(f"
Final counter value: {
              safe_counter.get_value()}")
# 输出应该等于 num_threads * num_calls_per_thread

# 输出示例 (由于 RLock 的可重入性,不会发生死锁):
# Thread Worker-1: Starting.
# Thread Worker-1: Attempting to acquire RLock in increment...
# Thread Worker-1: Acquired RLock in increment.
# Thread Worker-1: Counter value is now 1
# Thread Worker-1: Attempting to acquire RLock in _internal_operation...
# Thread Worker-1: Acquired RLock in _internal_operation.
# Thread Worker-1: Performing internal operation.
# Thread Worker-1: Released RLock in _internal_operation.
# Thread Worker-1: Released RLock in increment.
# ... 其他线程交错执行 ...
# Thread Worker-5: Finished.
#
# Final counter value:
# Thread MainThread: Attempting to acquire RLock in get_value...
# Thread MainThread: Acquired RLock in get_value.
# Thread MainThread: Released RLock in get_value.
# 10

这个例子展示了 RLock 的作用。当 increment 方法已经持有了锁,然后调用 _internal_operation 方法,而 _internal_operation 也尝试获取同一个锁时,如果使用的是标准 Lock,线程会自己阻塞自己,导致死锁。使用 RLock 则允许同一个线程多次获取同一个锁,只要获取和释放的次数匹配即可。

5.1.4 线程的局限性:GIL (Global Interpreter Lock)

虽然 Python 提供了 threading 模块来实现并发,但由于 CPython 解释器中存在一个叫做 全局解释器锁 (GIL) 的机制,Python 线程在执行 CPU 密集型任务时并不能真正地并行。

全局解释器锁 (GIL):

GIL 是 CPython 解释器(最常见的Python实现)中的一个互斥锁,它限制了任何时候只有一个本地线程可以执行 Python 字节码。这意味着,即使在多核 CPU 上,多个 Python 线程也无法同时执行 CPU 密集型计算。当一个线程在执行 Python 代码时,它会持有 GIL,其他线程必须等待它释放 GIL 才能执行。

GIL 对线程的影响:

I/O 密集型任务: 当一个线程执行 I/O 操作(如读写文件、网络通信)时,它会释放 GIL,允许其他线程运行。因此,对于包含大量 I/O 等待时间的任务,多线程可以有效地提高并发性能。
CPU 密集型任务: 当一个线程执行纯粹的 CPU 计算时,它会一直持有 GIL,直到遇到一个 I/O 操作、显式释放 GIL 或达到解释器的内部“检查点”(例如,每隔100个字节码指令)。这导致其他线程无法在其他核心上同时执行,多线程的 CPU 密集型任务并不能实现并行加速,甚至可能因为线程切换的开销而比单线程更慢。

代码示例:GIL 对 CPU 密集型任务的影响

import threading
import time
import os # 导入 os 模块获取 CPU 核心数

# 定义一个 CPU 密集型任务函数
def cpu_bound_task(n):
    # 模拟一个 CPU 密集型计算
    # n: 计算的迭代次数
    print(f"Thread {
              threading.current_thread().name}: Starting CPU bound task...")
    result = 0
    # 模拟大量的数学计算
    for i in range(n):
        result += i * i
    print(f"Thread {
              threading.current_thread().name}: Finished CPU bound task.")
    return result

# 模拟 I/O 密集型任务函数
def io_bound_task(duration):
    # 模拟一个 I/O 密集型任务
    # duration: 模拟等待的时间
    print(f"Thread {
              threading.current_thread().name}: Starting IO bound task...")
    time.sleep(duration) # 模拟 I/O 等待
    print(f"Thread {
              threading.current_thread().name}: Finished IO bound task.")

# --- 演示 GIL 对 CPU 密集型任务的影响 ---
print("--- Demonstrating GIL impact on CPU bound tasks ---")
cpu_iterations = 10_000_000 # 设置 CPU 任务的迭代次数

# 单线程执行 CPU 密集型任务
print("Running single thread CPU bound task...")
start_time = time.time()
cpu_bound_task(cpu_iterations)
end_time = time.time()
print(f"Single thread CPU bound task took: {
              end_time - start_time:.4f} seconds.")

# 多线程执行 CPU 密集型任务
print("
Running multi-thread CPU bound tasks...")
num_threads = os.cpu_count() if os.cpu_count() else 4 # 获取 CPU 核心数,或默认 4 个线程
threads = []
start_time = time.time()
for i in range(num_threads):
    t = threading.Thread(target=cpu_bound_task, args=(cpu_iterations // num_threads,), name=f"CPU-Thread-{
              i+1}")
    # 将总迭代次数分摊到各个线程
    threads.append(t)
    t.start()

for t in threads:
    t.join()
end_time = time.time()
print(f"Multi-thread ({
              num_threads}) CPU bound tasks took: {
              end_time - start_time:.4f} seconds.")
# 比较多线程和单线程的执行时间,多线程版本可能并不会快很多,甚至可能慢一些

# --- 演示多线程在 I/O 密集型任务中的效果 ---
print("
--- Demonstrating multi-thread effect on IO bound tasks ---")
num_io_threads = 5
io_duration = 1 # 每个 I/O 任务模拟等待 1 秒
threads_io = []
start_time = time.time()
for i in range(num_io_threads):
    t = threading.Thread(target=io_bound_task, args=(io_duration,), name=f"IO-Thread-{
              i+1}")
    threads_io.append(t)
    t.start()

for t in threads_io:
    t.join()
end_time = time.time()
print(f"Multi-thread ({
              num_io_threads}) IO bound tasks took: {
              end_time - start_time:.4f} seconds.")
# 比较总耗时和单个任务的耗时 (5个任务每个1秒,理论上并发可以在接近1秒的时间完成,但受线程启动和切换开销影响会稍长)
# 输出通常会远小于 num_io_threads * io_duration (5秒),表明实现了并发

这个示例直观地对比了多线程在 CPU 密集型和 I/O 密集型任务中的表现。对于 CPU 密集型任务,多线程版本并没有显示出明显的加速,这是由于 GIL 限制了它们无法在多个核心上并行执行 Python 字节码。而对于 I/O 密集型任务,由于线程在等待 I/O 时会释放 GIL,多线程版本能够有效地实现并发,显著缩短了总执行时间。

如何绕过 GIL 处理 CPU 密集型任务的并行?

对于需要并行执行 CPU 密集型任务的场景,Python提供了 multiprocessing 模块。每个进程有自己独立的 Python 解释器和 GIL,因此不同的进程可以在不同的 CPU 核心上真正地并行执行 Python 代码。

5.2 进程 (Multiprocessing):实现真正的并行

multiprocessing 模块提供了类似于 threading 模块的 API,但它使用进程而不是线程。进程是操作系统分配资源(内存空间、文件句柄等)的基本单位。每个进程有自己的独立内存空间,这避免了线程间共享内存带来的复杂性(如竞态条件),但也意味着进程间通信需要额外的机制。

5.2.1 创建和运行进程

使用 multiprocessing.Process 类来创建进程,其用法与 threading.Thread 非常相似。

import multiprocessing
import time
import os # 导入 os 模块用于获取进程 ID

# 定义一个进程要执行的函数
def process_worker(name, duration):
    # 这是进程将要执行的任务函数
    # name: 进程的名称
    # duration: 模拟任务执行的时间长度(秒)
    process_id = os.getpid() # 获取当前进程的 ID
    print(f"Process {
              name} (PID: {
              process_id}): Starting task for {
              duration} seconds.")
    # 打印进程开始执行的提示信息,包括进程 ID
    time.sleep(duration)
    # 模拟耗时的任务执行
    print(f"Process {
              name} (PID: {
              process_id}): Task finished.")
    # 打印进程任务完成的提示信息

# --- 创建和运行进程示例 ---

# 创建第一个进程
process1 = multiprocessing.Process(target=process_worker, args=("Process 1", 2))
# 使用 multiprocessing.Process 类创建一个进程对象
# target 参数指定进程要执行的函数 process_worker
# args 参数是一个元组,包含传递给 target 函数的位置参数 ("Process 1", 2)

# 创建第二个进程
process2 = multiprocessing.Process(target=process_worker, args=("Process 2", 3))
# 创建第二个进程对象

# 启动进程
# 启动进程会在操作系统中创建一个新的进程,并在其中执行 target 函数
process1.start()
# 启动第一个进程
process2.start()
# 启动第二个进程

print(f"Main process (PID: {
              os.getpid()}): Started child processes.")
# 主进程在启动子进程后会继续执行

# 等待所有进程完成
# join() 方法会阻塞当前进程 (主进程) 直到调用的进程执行完毕
process1.join()
# 等待 process1 执行完毕
process2.join()
# 等待 process2 执行完毕

print(f"Main process (PID: {
              os.getpid()}): All child processes finished.")
# 所有子进程都执行完毕后,主进程才会执行到这里

# 输出示例:
# Main process (PID: xxxx): Started child processes.
# Process Process 1 (PID: yyyy): Starting task for 2 seconds.
# Process Process 2 (PID: zzzz): Starting task for 3 seconds.
# Process Process 1 (PID: yyyy): Task finished.
# Process Process 2 (PID: zzzz): Task finished.
# Main process (PID: xxxx): All child processes finished.
# 注意:每个进程都有不同的 PID,它们在不同的操作系统进程中运行。
# 由于是并行执行,如果有多核 CPU,Process 1 和 Process 2 可能会在不同的核心上同时运行。

这个例子展示了如何创建和启动两个独立的进程。与线程类似,主进程通过 join() 方法等待子进程完成。关键区别在于,每个子进程都在自己的独立进程中运行,拥有独立的内存空间和 GIL。

5.2.2 进程间通信 (Inter-Process Communication – IPC)

由于进程有独立的内存空间,它们不能像线程那样直接访问和修改共享变量。进程间通信 (IPC) 需要使用专门的机制。multiprocessing 模块提供了多种 IPC 机制:

队列 (Queue): 安全地在多个进程之间传递消息。multiprocessing.Queue 是线程和进程安全的。
管道 (Pipe): 用于在两个进程之间创建一对连接。管道是双向的(默认)或单向的。
共享内存 (Shared Memory): 允许进程访问同一块内存区域,但需要额外的同步机制(如锁)来避免竞态条件。
同步原语: multiprocessing 模块也提供了 Lock, Semaphore, Event 等同步原语,它们是进程安全的。

企业级代码示例:使用 Queue 进行进程间通信

考虑一个企业应用场景:一个进程负责读取大量数据,另一个或多个进程负责处理这些数据。可以使用队列将数据从读取进程传递给处理进程。

import multiprocessing
import time
import os
import random

# 定义数据生产者进程函数
def data_producer(queue, num_items):
    # 数据生产者进程函数
    # queue: 用于传递数据的 Queue 对象
    # num_items: 要生产的数据项数量
    process_id = os.getpid()
    print(f"Producer (PID: {
              process_id}): Starting to produce {
              num_items} items.")

    for i in range(num_items):
        # 生产数据 (这里模拟生成一些数字)
        item = f"item_{
              i+1}_from_pid_{
              process_id}"
        print(f"Producer (PID: {
              process_id}): Putting '{
              item}' into queue.")
        queue.put(item) # 将数据项放入队列
        time.sleep(random.uniform(0.01, 0.05)) # 模拟生产数据所需时间

    print(f"Producer (PID: {
              process_id}): Finished producing.")
    queue.put(None) # 生产完成后,放入一个 None 表示结束信号 (消费者根据 None 判断结束)
    # 在更复杂的场景中,可以使用更明确的结束标志对象或消息

# 定义数据消费者进程函数
def data_consumer(queue):
    # 数据消费者进程函数
    # queue: 用于获取数据的 Queue 对象
    process_id = os.getpid()
    print(f"Consumer (PID: {
              process_id}): Starting to consume data.")

    while True:
        # 循环从队列中获取数据
        print(f"Consumer (PID: {
              process_id}): Getting item from queue...")
        item = queue.get() # 从队列中获取数据项,会阻塞直到队列中有数据
        # queue.get(block=True, timeout=...) 也可以设置超时或非阻塞模式

        if item is None:
            # 检查是否收到结束信号
            print(f"Consumer (PID: {
              process_id}): Received end signal.")
            break # 收到结束信号,退出循环

        # 处理获取到的数据项
        print(f"Consumer (PID: {
              process_id}): Consuming '{
              item}'.")
        time.sleep(random.uniform(0.05, 0.1)) # 模拟处理数据所需时间
        # 在实际应用中,这里会进行数据清洗、计算、存储等操作

    print(f"Consumer (PID: {
              process_id}): Finished consuming.")

# --- 使用 Queue 进行进程间通信示例 ---
print("--- Using Queue for Inter-Process Communication ---")

# 创建一个 Queue 对象,用于在生产者和消费者进程之间传递数据
data_queue = multiprocessing.Queue()
# multiprocessing.Queue() 是线程和进程安全的队列

num_items_to_produce = 10 # 生产 10 个数据项

# 创建生产者进程
producer_process = multiprocessing.Process(target=data_producer, args=(data_queue, num_items_to_produce))
# target 是生产者函数,args 是传递给函数的参数 (队列对象和生产数量)

# 创建消费者进程
consumer_process = multiprocessing.Process(target=data_consumer, args=(data_queue,))
# target 是消费者函数,args 是传递给函数的参数 (队列对象)

# 启动进程
producer_process.start()
consumer_process.start()

print(f"Main process (PID: {
              os.getpid()}): Producer and consumer processes started.")

# 等待生产者和消费者进程完成
producer_process.join()
print("Main process: Producer finished.")

consumer_process.join()
print("Main process: Consumer finished.")

print("--- Inter-Process Communication example finished ---")

# 输出示例 (进程执行顺序和 Queue 操作取决于操作系统调度):
# --- Using Queue for Inter-Process Communication ---
# Main process (PID: xxxx): Producer and consumer processes started.
# Producer (PID: yyyy): Starting to produce 10 items.
# Consumer (PID: zzzz): Starting to consume data.
# Consumer (PID: zzzz): Getting item from queue...
# Producer (PID: yyyy): Putting 'item_1_from_pid_yyyy' into queue.
# Consumer (PID: zzzz): Consuming 'item_1_from_pid_yyyy'.
# Producer (PID: yyyy): Putting 'item_2_from_pid_yyyy' into queue.
# Consumer (PID: zzzz): Getting item from queue...
# Consumer (PID: zzzz): Consuming 'item_2_from_pid_yyyy'.
# ... 生产和消费交替进行 ...
# Producer (PID: yyyy): Finished producing.
# Producer (PID: yyyy): Putting 'None' into queue.
# Consumer (PID: zzzz): Getting item from queue...
# Consumer (PID: zzzz): Received end signal.
# Consumer (PID: zzzz): Finished consuming.
# Main process: Producer finished.
# Main process: Consumer finished.
# --- Inter-Process Communication example finished ---

这个例子展示了如何使用 multiprocessing.Queue 在独立的生产者和消费者进程之间安全地传递数据。生产者将数据放入队列,消费者从队列中获取数据。队列的使用简化了进程间的数据同步和通信,是实现并行数据处理的常用模式。

5.2.3 进程池 (Pool)

当需要并行执行大量相似任务时,手动创建和管理进程会变得繁琐。multiprocessing.Pool 提供了一个方便的方式来管理一个进程池,将任务提交给池中的进程并行执行。

import multiprocessing
import time
import os

# 定义一个简单的任务函数,用于在进程池中执行
def process_task(x):
    # 进程池中的一个任务函数
    # x: 任务的输入参数
    process_id = os.getpid()
    # print(f"Process (PID: {process_id}): Processing {x}...")
    # 模拟一些计算密集型工作
    time.sleep(0.1) # 短暂暂停
    result = x * x
    # print(f"Process (PID: {process_id}): Finished processing {x}, result: {result}")
    return result
    # 返回任务结果

# --- 使用进程池示例 ---
print("--- Using Multiprocessing Pool ---")

# 获取可用的 CPU 核心数,通常用于确定进程池的大小
num_cores = os.cpu_count() if os.cpu_count() else 4
print(f"Number of available CPU cores: {
              num_cores}")

# 创建一个进程池
# with multiprocessing.Pool(processes=num_cores) as pool:
# 推荐使用 with 语句管理进程池,确保进程池在退出时被关闭

# 或者手动创建和关闭进程池
pool = multiprocessing.Pool(processes=num_cores)
# 创建一个进程池,指定进程数量为 CPU 核心数

print(f"Created a process pool with {
              num_cores} processes.")

# 准备要处理的数据列表
data_to_process = range(10) # 0 到 9 的数字列表

# 使用 pool.map() 将任务分发到进程池中并行执行
# pool.map(func, iterable) 将 iterable 中的每个元素作为参数调用 func 函数,并在不同的进程中并行执行
# map 方法会阻塞,直到所有任务完成,并按照输入顺序返回结果列表
print("Submitting tasks to pool using map()...")
start_time = time.time()
results = pool.map(process_task, data_to_process)
end_time = time.time()
print("All tasks finished.")

print(f"Results from pool.map(): {
              results}")
print(f"Processing time using pool.map(): {
              end_time - start_time:.4f} seconds.")
# map 方法适用于所有任务都完成后才需要处理结果的场景

# 使用 pool.apply_async() 异步提交任务,并获取 AsyncResult 对象
# apply_async(func, args=(), kwds={}) 提交单个任务,立即返回 AsyncResult 对象
print("
Submitting tasks to pool using apply_async()...")
async_results = []
start_time = time.time()
for x in data_to_process:
    # 异步提交任务
    async_result = pool.apply_async(process_task, args=(x,))
    # apply_async 立即返回一个 AsyncResult 对象
    async_results.append(async_result)
    # 将 AsyncResult 对象添加到列表中

# 可以通过 AsyncResult 对象检查任务是否完成、获取结果或等待
# 例如,等待所有任务完成并获取结果
processed_results_async = []
for async_result in async_results:
    # async_result.wait() # 可以等待单个任务完成
    result = async_result.get() # 获取任务结果,如果任务未完成会阻塞
    processed_results_async.append(result)

end_time = time.time()
print("All async tasks finished and results collected.")
print(f"Results from pool.apply_async(): {
              processed_results_async}")
print(f"Processing time using pool.apply_async(): {
              end_time - start_time:.4f} seconds.")
# apply_async 适用于需要处理任务的顺序与提交顺序不同,或者需要异步获取结果的场景

# 使用 pool.imap() 和 pool.imap_unordered() 获取迭代器结果
# imap() 返回一个迭代器,结果按照输入顺序产生 (惰性计算)
# imap_unordered() 返回一个迭代器,结果按照完成顺序产生
print("
Submitting tasks to pool using imap()...")
start_time = time.time()
imap_results_iterator = pool.imap(process_task, data_to_process)
# imap 返回一个迭代器

# 消费 imap 迭代器
imap_results = list(imap_results_iterator) # 将迭代器转换为列表以显示所有结果
end_time = time.time()
print("All imap tasks finished and results collected.")
print(f"Results from pool.imap(): {
              imap_results}") # 结果顺序与输入顺序一致
print(f"Processing time using pool.imap(): {
              end_time - start_time:.4f} seconds.")


print("
Submitting tasks to pool using imap_unordered()...")
start_time = time.time()
imap_unordered_results_iterator = pool.imap_unordered(process_task, data_to_process)
# imap_unordered 返回一个迭代器

# 消费 imap_unordered 迭代器
imap_unordered_results = list(imap_unordered_results_iterator) # 将迭代器转换为列表
end_time = time.time()
print("All imap_unordered tasks finished and results collected.")
print(f"Results from pool.imap_unordered(): {
              imap_unordered_results}") # 结果顺序可能与输入顺序不同
print(f"Processing time using pool.imap_unordered(): {
              end_time - start_time:.4f} seconds.")


# 关闭进程池 (使用 with 语句会自动关闭)
pool.close()
# 不再接受新任务提交

pool.join()
# 等待所有已提交的任务完成

print("
Process pool closed and joined.")

# 输出示例 (注意 map 和 imap 的结果顺序固定,imap_unordered 的结果顺序可能变化):
# --- Using Multiprocessing Pool ---
# Number of available CPU cores: x
# Created a process pool with x processes.
# Submitting tasks to pool using map()...
# All tasks finished.
# Results from pool.map(): [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Processing time using pool.map(): 0.xxx seconds.
#
# Submitting tasks to pool using apply_async()...
# All async tasks finished and results collected.
# Results from pool.apply_async(): [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Processing time using pool.apply_async(): 0.xxx seconds.
#
# Submitting tasks to pool using imap()...
# All imap tasks finished and results collected.
# Results from pool.imap(): [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Processing time using pool.imap(): 0.xxx seconds.
#
# Submitting tasks to pool using imap_unordered()...
# All imap_unordered tasks finished and results collected.
# Results from pool.imap_unordered(): [0, 16, 9, 4, 1, 25, 36, 49, 64, 81] # 顺序可能变化
# Processing time using pool.imap_unordered(): 0.xxx seconds.
#
# Process pool closed and joined.

这个例子展示了如何使用 multiprocessing.Pool 高效地并行执行 CPU 密集型任务。map, apply_async, imap, imap_unordered 提供了不同的任务提交和结果收集方式,可以根据具体需求选择。进程池自动化了进程的创建、销毁和任务调度,使得并行编程更加便捷。

5.2.4 进程的同步与共享状态

虽然进程不共享内存,但有时需要在进程之间共享一些状态或进行同步。multiprocessing 模块提供了进程安全的同步原语和共享内存机制。

Value 和 Array: 用于在进程之间共享单个值或一个数组。它们需要配合锁来保证安全访问。
Manager: 提供了一种管理共享对象(如列表、字典、锁等)的方式,这些对象可以在不同进程中安全地访问。

代码示例:使用 Value 和 Lock 共享状态

import multiprocessing
import time

# 定义一个进程函数,用于更新共享值
def update_shared_value(shared_value, lock, num_updates):
    # 进程函数,更新共享的整数值
    # shared_value: multiprocessing.Value 对象
    # lock: multiprocessing.Lock 对象,用于保护 shared_value
    # num_updates: 更新次数
    process_name = multiprocessing.current_process().name # 获取当前进程的名称

    print(f"Process {
              process_name}: Starting to update shared value {
              num_updates} times.")

    for i in range(num_updates):
        # 在修改共享值前获取锁
        with lock:
            # 获取进程安全的锁
            # 访问和修改共享 Value
            current_value = shared_value.value
            # 读取共享值
            time.sleep(0.001) # 模拟一些工作
            shared_value.value = current_value + 1
            # 修改共享值
            # print(f"Process {process_name}: Update {i+1}, shared value is now {shared_value.value}")

        # 退出 with 块,锁自动释放

    print(f"Process {
              process_name}: Finished updating.")

# --- 使用 Value 和 Lock 共享状态示例 ---
print("--- Using Value and Lock for Shared State ---")

# 创建一个共享的整数值 (i 表示 integer)
shared_counter = multiprocessing.Value('i', 0)
# multiprocessing.Value(typecode, value) 创建一个共享值对象
# typecode 指定数据类型,'i' 表示带符号整数

# 创建一个进程安全的锁
shared_lock = multiprocessing.Lock()
# multiprocessing.Lock() 是进程安全的锁

num_processes = 5
updates_per_process = 1000
total_expected = num_processes * updates_per_process

processes = []
start_time = time.time()
for i in range(num_processes):
    p = multiprocessing.Process(target=update_shared_value, args=(shared_counter, shared_lock, updates_per_process), name=f"Updater-{
              i+1}")
    processes.append(p)
    p.start()

for p in processes:
    p.join()
end_time = time.time()

print(f"
Final shared counter value: {
              shared_counter.value}")
print(f"Expected shared counter value: {
              total_expected}")
print(f"Is shared counter value correct? {
              shared_counter.value == total_expected}")
print(f"Processing time: {
              end_time - start_time:.4f} seconds.")

# 输出示例:
# --- Using Value and Lock for Shared State ---
# Process Updater-1: Starting to update shared value 1000 times.
# Process Updater-2: Starting to update shared value 1000 times.
# Process Updater-3: Starting to update shared value 1000 times.
# Process Updater-4: Starting to update shared value 1000 times.
# Process Updater-5: Starting to update shared value 1000 times.
# ... 进程交错执行 ...
# Process Updater-1: Finished updating.
# Process Updater-2: Finished updating.
# ...
#
# Final shared counter value: 5000
# Expected shared counter value: 5000
# Is shared counter value correct? True
# Processing time: 0.xxx seconds.

这个例子演示了如何使用 multiprocessing.Value 创建一个进程间共享的整数,并使用 multiprocessing.Lock 来同步对该共享值的访问。与线程锁类似,进程锁 (multiprocessing.Lock) 用于确保在任何时候只有一个进程可以修改共享变量,从而避免竞态条件。with lock: 语句同样适用于进程锁。

5.3 线程与进程的选择

选择使用线程还是进程取决于任务的类型:

I/O 密集型任务 (网络请求、文件读写等): 优先使用线程。线程创建和切换开销小,且在等待 I/O 时可以释放 GIL,实现有效并发。
CPU 密集型任务 (复杂计算、数据处理等): 优先使用进程。进程有独立的 GIL,可以在多核 CPU 上实现真正的并行,充分利用 CPU 资源。

其他考虑因素:

内存: 进程拥有独立的内存空间,内存开销较大。线程共享进程内存,开销较小。
复杂性: 线程共享内存,需要更仔细地处理同步问题(竞态条件、死锁)。进程间通信需要显式机制,可能增加了代码复杂性,但避免了共享内存的陷阱。
容错性: 一个线程崩溃可能会导致整个进程崩溃。一个进程崩溃通常不会影响其他进程。
创建/启动开销: 进程的创建和启动开销比线程大。

第六章:异步编程与 asyncio:构建高效的并发系统

在处理大量 I/O 操作(如网络通信、文件读写、数据库交互)时,传统的同步编程模型会阻塞程序的执行,直到 I/O 完成。虽然多线程可以在一定程度上解决这个问题(因为线程在等待 I/O 时会释放 GIL),但线程的创建和管理开销、以及线程间同步的复杂性在高并发场景下会成为瓶颈。多进程虽然绕过了 GIL,但进程间通信的开销和资源消耗更大,更适合 CPU 密集型任务。

异步编程提供了一种不同的并发模型。它允许程序在等待某个 I/O 操作完成时,切换去执行其他任务,而不是阻塞等待。当 I/O 完成后,程序可以稍后“恢复”之前被中断的任务。这种模型通常使用单个线程(或少量线程)和事件循环(Event Loop)来实现高效的并发。Python 3.4 引入了 asyncio 库,并在后续版本中通过 asyncawait 关键字提供了原生支持,使得异步编程变得更加 Pythonic。

6.1 同步与异步的概念对比

让我们通过一个简单的网络请求例子来对比同步和异步编程的差异。

同步示例:

import requests
import time

# 定义一个同步函数,用于模拟下载一个网页
def download_sync(url):
    # 这个函数是同步执行的
    print(f"Synchronous: Downloading {
              url}...")
    try:
        response = requests.get(url, timeout=5) # 使用 requests 库发起同步 HTTP GET 请求
        # requests.get() 方法会阻塞,直到服务器响应或发生超时
        response.raise_for_status() # 检查请求是否成功(状态码 2xx),如果不是则抛出异常
        content = response.text # 获取响应内容 (文本格式)
        print(f"Synchronous: Finished downloading {
              url}, content length: {
              len(content)}")
        # 打印下载完成信息和内容长度
        return content # 返回下载的网页内容
    except requests.exceptions.RequestException as e:
        # 捕获请求相关的异常 (如网络错误、超时、HTTP错误)
        print(f"Synchronous: Error downloading {
              url}: {
              e}")
        # 打印错误信息
        return None # 返回 None 表示下载失败

# 模拟下载多个网页(同步方式)
urls = [
    "http://www.example.com",
    "http://www.google.com",
    "http://www.bing.com"
]

print("--- Starting Synchronous Downloads ---")
start_time = time.time() # 记录开始时间

for url in urls:
    # 循环遍历 URL 列表
    download_sync(url) # 依次调用 download_sync 函数下载每个 URL
    # 每个 download_sync 调用都会阻塞,直到下载完成,然后才开始下一个 URL 的下载

end_time = time.time() # 记录结束时间
print(f"--- Synchronous Downloads Finished in {
              end_time - start_time:.2f} seconds ---")
# 打印总耗时
# 总耗时大约是每个下载耗时之和

在同步示例中,requests.get() 调用会阻塞程序的执行,直到获取到完整的响应。下载完一个 URL 后,才会开始下载下一个。总的执行时间是所有下载时间之和,这在需要同时处理多个 I/O 请求时效率低下。

异步概念示例 (使用伪代码说明):

# 伪代码:说明异步下载的流程概念
async def download_async_conceptual(url):
    print(f"Asynchronous (Conceptual): Starting download of {
              url}...")
    # 发起下载请求,但不阻塞
    # 当等待响应时,程序可以去做其他事情
    response = await async_http_library.get(url) # 假设 async_http_library 提供了异步请求方法
    # 'await' 表示在这里等待异步操作完成
    # 在等待期间,控制权会返回给事件循环,事件循环可以运行其他已经准备好的任务
    print(f"Asynchronous (Conceptual): Received response for {
              url}.")
    content = await response.read() # 异步读取响应内容
    print(f"Asynchronous (Conceptual): Finished download of {
              url}, content length: {
              len(content)}")
    return content

# 主程序协调多个异步任务
async def main_conceptual():
    urls = [
        "http://www.example.com",
        "http://www.google.com",
        "http://www.bing.com"
    ]
    tasks = []
    for url in urls:
        # 创建多个异步下载任务,但不立即运行
        task = create_async_task(download_async_conceptual(url)) # 假设 create_async_task 创建一个可运行的任务
        tasks.append(task)

    # 同时运行所有任务,并在它们全部完成后等待结果
    results = await run_all_async_tasks(tasks) # 假设 run_all_async_tasks 同时运行任务并等待完成

    # 处理结果...

# 运行异步主程序 (需要一个事件循环)
# run_async_main(main_conceptual()) # 假设 run_async_main 启动事件循环并运行 main_conceptual

在异步概念中,await 关键字是非阻塞的关键。当遇到 await 时,函数会暂停执行并将控制权交还给事件循环。事件循环可以检查是否有其他任务已经准备好继续执行(例如,它们的 I/O 操作已经完成)。当被等待的异步操作(如网络请求响应)完成后,事件循环会安排之前暂停的函数从 await 的位置继续执行。这样,可以在等待一个任务的 I/O 时,有效地利用 CPU 时间去处理其他任务,从而显著提高 I/O 密集型应用的并发性能。

6.2 asyncawait 关键字

async def: 用于定义一个协程函数 (Coroutine Function)。协程函数在被调用时不会立即执行函数体,而是返回一个协程对象 (Coroutine Object)。协程函数必须在 async 函数内部被调用,并且通常需要与 await 关键字一起使用。
await: 用于等待一个“可等待对象”(Awaitable Object) 的完成。可等待对象包括协程对象、Task 对象以及其他实现了特定协议(__await__ 方法)的对象。await 只能在 async def 定义的协程函数内部使用。当在一个协程中 await 另一个可等待对象时,当前协程会暂停执行,将控制权交还给事件循环,直到被等待的对象完成。

import asyncio
import time

# 定义一个普通的异步协程函数
async def greet(name, delay):
    # 这是一个协程函数,使用 async def 定义
    print(f"Coroutine greet({
              name}): Starting to greet...")
    # 在协程内部,可以使用 await 等待其他可等待对象
    await asyncio.sleep(delay) # await asyncio.sleep(delay) 是一个可等待对象,它会模拟一个异步的暂停(非阻塞)
    # 在 await 等待期间,greet 协程会暂停,事件循环可以运行其他协程
    print(f"Coroutine greet({
              name}): Hello, {
              name}!")
    # 当 asyncio.sleep(delay) 完成后,greet 协程会从这里恢复执行
    return f"Greeting for {
              name} completed." # 协程可以返回值

# 定义另一个协程函数
async def main_async():
    # 这是一个主要的协程函数,用于协调其他协程
    print("Main Coroutine: Starting...")

    # 直接创建并等待一个协程
    # await greet("Alice", 2)
    # 这种方式是按顺序执行的,等待 greet("Alice", 2") 完成后才继续

    # 同时运行多个协程
    # 使用 asyncio.create_task() 创建 Task 对象,并放入一个列表中
    # Task 是对协程的封装,用于在事件循环中调度和运行协程
    task1 = asyncio.create_task(greet("Bob", 3))
    # 创建一个 Task 来运行 greet("Bob", 3) 协程
    task2 = asyncio.create_task(greet("Charlie", 1))
    # 创建一个 Task 来运行 greet("Charlie", 1) 协程

    # asyncio.create_task() 不会阻塞,而是立即返回 Task 对象,协程会在事件循环中开始运行

    print("Main Coroutine: Tasks created, waiting for completion...")

    # 使用 await 等待 Task 对象完成
    # await task1
    # await task2
    # 这样也可以等待 Task 完成,但仍然是按顺序等待

    # 更常见的方式是使用 asyncio.gather() 来同时等待多个可等待对象(协程或 Task)
    # asyncio.gather(*awaitables) 返回一个可等待对象,等待所有传入的可等待对象完成
    results = await asyncio.gather(task1, task2)
    # 同时等待 task1 和 task2 完成,并收集它们的返回值

    print("Main Coroutine: All tasks finished.")
    print(f"Main Coroutine: Results: {
              results}")
    # 打印从协程返回的结果列表

# 运行顶层协程
# 需要使用 asyncio.run() 函数来运行顶层协程并管理事件循环
print("--- Running Asyncio Program ---")
start_time = time.time() # 记录开始时间

asyncio.run(main_async())
# asyncio.run(coroutine) 负责创建一个新的事件循环,运行传入的顶层协程,
# 并在协程执行完毕或发生异常时关闭事件循环

end_time = time.time() # 记录结束时间
print(f"--- Asyncio Program Finished in {
              end_time - start_time:.2f} seconds ---")
# 打印总耗时
# 总耗时大约是等待时间最长的任务的耗时 (这里是 Bob 的 3 秒),因为它们是并发执行的

# 输出示例:
# --- Running Asyncio Program ---
# Main Coroutine: Starting...
# Coroutine greet(Bob): Starting to greet...
# Coroutine greet(Charlie): Starting to greet...
# Main Coroutine: Tasks created, waiting for completion...
# Coroutine greet(Charlie): Hello, Charlie!
# Coroutine greet(Bob): Hello, Bob!
# Main Coroutine: All tasks finished.
# Main Coroutine: Results: ['Greeting for Bob completed.', 'Greeting for Charlie completed.']
# --- Asyncio Program Finished in 3.xxx seconds ---
# 注意:Charlie 先完成 (1秒),Bob 后完成 (3秒)。
# 主协程创建任务后立即打印下一行,然后 await asyncio.gather 阻塞主协程,
# 控制权回到事件循环,事件循环调度 Bob 和 Charlie 的 greet 协程运行。
# 当 Charlie 的 sleep 完成后,它的协程恢复执行并打印 "Hello, Charlie!"。
# 当 Bob 的 sleep 完成后,它的协程恢复执行并打印 "Hello, Bob!"。
# 最后 asyncio.gather 完成,主协程恢复执行。

这个例子演示了如何使用 async def 定义协程,await 在协程内部等待异步操作(这里是 asyncio.sleep),以及使用 asyncio.create_taskasyncio.gather 来同时运行和管理多个协程任务。asyncio.run() 是运行异步程序的入口点。

6.3 事件循环 (Event Loop)

事件循环是 asyncio 的核心。它是一个循环,负责:

监视已注册的异步操作(如网络连接、定时器、子进程等)是否完成。
当一个操作完成时,通知相应的协程恢复执行。
调度哪些已准备好的协程可以运行。
处理任务的创建和取消。

开发者通常不需要直接与事件循环交互,asyncio.run() 函数会自动创建、管理和关闭事件循环。但在更复杂的场景(如集成其他框架、自定义事件处理)下,可能需要获取当前的事件循环对象并进行更精细的控制。

import asyncio
import time

async def coro_with_loop_info():
    # 一个获取事件循环信息的协程
    loop = asyncio.get_running_loop() # 获取当前正在运行的事件循环对象
    # asyncio.get_running_loop() 只能在协程内部调用

    print(f"Coroutine running on event loop: {
              loop}")
    # 打印事件循环对象的信息

    # 可以在事件循环上安排其他任务
    loop.call_soon(lambda: print("Callback scheduled with call_soon"))
    # 使用 call_soon 在下一次事件循环迭代时安排一个回调函数立即执行
    # call_soon 不会阻塞当前协程

    print("Coroutine doing some work...")
    await asyncio.sleep(0.1) # 模拟异步工作

    # 也可以在事件循环上安排延迟回调
    loop.call_later(0.5, lambda: print("Callback scheduled with call_later"))
    # 使用 call_later 安排一个回调函数在指定的延迟后执行

    print("Coroutine finished.")


async def main_loop_example():
    print("Main coroutine starting, getting event loop...")
    loop = asyncio.get_running_loop() # 在顶层协程中获取事件循环

    print(f"Main coroutine running on event loop: {
              loop}")

    # 创建并运行子协程
    await coro_with_loop_info()

    print("Main coroutine finished.")
    # 注意:call_later 安排的回调函数可能会在 main_async 结束后才执行,
    # 因为事件循环会继续运行直到所有任务和回调都完成。

# 运行异步主程序
print("--- Event Loop Example ---")
asyncio.run(main_loop_example())
# asyncio.run() 负责创建和管理事件循环

# 输出示例 (call_later 回调可能在最后打印):
# --- Event Loop Example ---
# Main coroutine starting, getting event loop...
# Main coroutine running on event loop: <_WindowsSelectorEventLoop running...> # 事件循环对象信息
# Coroutine running on event loop: <_WindowsSelectorEventLoop running...>
# Coroutine doing some work...
# Callback scheduled with call_soon # call_soon 调度的回调函数会很快执行
# Coroutine finished.
# Main coroutine finished.
# Callback scheduled with call_later # call_later 调度的回调函数在 0.5 秒后执行

这个例子展示了如何在协程内部获取当前的事件循环对象,并使用 call_sooncall_later 等方法在事件循环上安排回调函数。虽然大多数时候不需要直接操作事件循环,但了解它的存在和基本功能对于理解 asyncio 的工作原理以及处理更复杂的场景是有帮助的。

6.4 Task 对象

正如前面所示,asyncio.create_task(coroutine) 用于将一个协程包装成一个 Task 对象,并将其提交给事件循环进行调度。Task 对象代表了一个即将运行、正在运行或已完成的协程。它是 asyncio 中用于管理和控制协程执行的基本单元。

import asyncio
import time

async def my_task(name, delay):
    # 一个简单的协程函数
    print(f"Task {
              name}: Starting...")
    await asyncio.sleep(delay)
    print(f"Task {
              name}: Finished after {
              delay} seconds.")
    return f"Result from {
              name}"

async def main_tasks():
    print("Main coroutine: Creating tasks...")

    # 创建两个 Task 对象
    task1 = asyncio.create_task(my_task("A", 2))
    # 创建 Task A,运行 my_task("A", 2) 协程
    task2 = asyncio.create_task(my_task("B", 1))
    # 创建 Task B,运行 my_task("B", 1) 协程

    print("Main coroutine: Tasks created. Tasks are now scheduled to run.")
    # Task 在被创建后就会被添加到事件循环中,等待调度执行

    # Task 对象本身是可等待的,可以直接 await
    # await task1
    # await task2 # 按顺序等待 Task

    # 获取 Task 的状态
    print(f"Task A done?: {
              task1.done()}") # 检查 Task A 是否完成
    print(f"Task B done?: {
              task2.done()}") # 检查 Task B 是否完成
    # 刚创建时,通常是 False

    print("Main coroutine: Waiting for tasks to complete using asyncio.gather...")
    results = await asyncio.gather(task1, task2)
    # 使用 asyncio.gather 同时等待 Task A 和 Task B 完成

    print("Main coroutine: All tasks completed.")
    print(f"Task A done?: {
              task1.done()}") # 完成后,done() 返回 True
    print(f"Task B done?: {
              task2.done()}")

    # 获取 Task 的结果或异常
    try:
        result_a = task1.result() # 获取 Task A 的结果 (如果已完成)
        # 如果 Task 发生了未处理的异常,调用 result() 会重新抛出该异常
        print(f"Result from Task A: {
              result_a}")
    except asyncio.InvalidStateError:
        # 如果 Task 尚未完成,调用 result() 会抛出 InvalidStateError
        print("Task A not done yet.")
    except Exception as e:
        # 如果 Task 完成时抛出了异常,可以在这里捕获
        print(f"Task A raised exception: {
              e}")

    try:
        result_b = task2.result() # 获取 Task B 的结果
        print(f"Result from Task B: {
              result_b}")
    except asyncio.InvalidStateError:
        print("Task B not done yet.")
    except Exception as e:
        print(f"Task B raised exception: {
              e}")

    # 检查 Task 是否被取消
    print(f"Task A cancelled?: {
              task1.cancelled()}") # 检查 Task A 是否被取消
    print(f"Task B cancelled?: {
              task2.cancelled()}") # 检查 Task B 是否被取消
    # 这里它们是正常完成,所以 cancelled() 返回 False

    print("Main coroutine: Finished.")

# 运行主协程
print("--- Task Object Example ---")
asyncio.run(main_tasks())

# 输出示例:
# --- Task Object Example ---
# Main coroutine: Creating tasks...
# Task A: Starting...
# Task B: Starting...
# Main coroutine: Tasks created. Tasks are now scheduled to run.
# Task A done?: False
# Task B done?: False
# Main coroutine: Waiting for tasks to complete using asyncio.gather...
# Task B: Finished after 1 seconds.
# Task A: Finished after 2 seconds.
# Main coroutine: All tasks completed.
# Task A done?: True
# Task B done?: True
# Result from Task A: Result from A
# Result from Task B: Result from B
# Task A cancelled?: False
# Task B cancelled?: False
# Main coroutine: Finished.

Task 对象是管理协程的关键。通过 asyncio.create_task 将协程提交到事件循环,它就会被安排在适当的时候运行。Task 对象提供了 done(), result(), exception(), cancel() 等方法,用于查询任务状态、获取结果/异常或取消任务。asyncio.gather 是一个方便的工具,用于同时等待多个可等待对象并收集它们的结果。

6.5 异步 I/O 操作

asyncio 本身不直接提供异步的 I/O 操作实现,它依赖于底层库或操作系统提供的异步原语。为了进行实际的异步 I/O(如网络通信、文件读写),需要使用专门的异步库。

网络通信: asyncio 提供了低级别的异步 Socket API,但通常推荐使用更高级的库,如 aiohttp(用于 HTTP 客户端和服务器)或 asyncpg/aiomysql(用于异步数据库访问)。
文件读写: 标准库的 open() 和文件方法是阻塞的。可以使用第三方库 aiofiles 来进行异步文件操作。

企业级代码示例:使用 aiohttp 异步下载多个网页

回到前面同步下载网页的例子,我们现在使用 aiohttp 库来实现异步版本,以展示其在高并发 I/O 场景下的优势。

首先,需要安装 aiohttp 库:pip install aiohttp

import asyncio
import aiohttp # 导入 aiohttp 库
import time

# 定义一个异步函数,使用 aiohttp 客户端下载一个网页
async def download_async(session, url):
    # 这个协程函数用于异步下载一个 URL
    # session: aiohttp.ClientSession 对象,用于管理连接池和 cookie 等
    # url: 要下载的 URL 字符串
    print(f"Asynchronous: Downloading {
              url}...")
    try:
        # 使用 with 语句确保响应被正确关闭
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
            # session.get() 发起异步 HTTP GET 请求,返回一个 ClientResponse 可等待对象
            # await response 返回 ClientResponse 对象
            # async with 语法适用于实现了异步上下文管理器协议的对象 (__aenter__, __aexit__)
            # aiohttp 的 ClientResponse 实现了这个协议,确保响应体读取完毕和连接释放

            # 检查 HTTP 状态码
            response.raise_for_status()
            # 如果状态码表示错误 (如 404, 500),则抛出 aiohttp.ClientResponseError

            # 异步读取响应内容
            content = await response.text()
            # await response.text() 异步读取响应体并解码为文本,会暂停当前协程直到读取完成

            print(f"Asynchronous: Finished downloading {
              url}, status: {
              response.status}, content length: {
              len(content)}")
            # 打印下载完成信息、状态码和内容长度
            return url, len(content) # 返回 URL 和内容长度作为结果
            # 协程可以返回结果

    except aiohttp.ClientError as e:
        # 捕获 aiohttp 客户端相关的异常 (包括连接错误、超时、HTTP错误等)
        print(f"Asynchronous: Error downloading {
              url}: {
              type(e).__name__} - {
              e}")
        # 打印错误信息和异常类型名称
        return url, None # 下载失败,返回 URL 和 None

# 主协程函数,用于同时下载多个网页
async def main_async_downloads(urls):
    # 主协程函数,负责协调多个下载任务
    print("--- Starting Asynchronous Downloads ---")
    start_time = time.time() # 记录开始时间

    # 创建一个 aiohttp.ClientSession 对象
    # ClientSession 用于管理底层的连接,推荐在一个应用生命周期内只创建一个或少量几个
    async with aiohttp.ClientSession() as session:
        # 使用 async with 确保 session 被正确关闭 (释放连接池等资源)
        # session.__aenter__() 在进入 with 块时执行
        # session.__aexit__() 在退出 with 块时执行

        tasks = []
        # 创建一个列表来存储异步任务
        for url in urls:
            # 遍历 URL 列表
            # 为每个 URL 创建一个下载任务
            task = asyncio.create_task(download_async(session, url))
            # 创建 Task 来运行 download_async 协程
            tasks.append(task)
            # 将 Task 添加到任务列表中

        # 使用 asyncio.gather 同时运行所有任务并等待它们完成
        # asyncio.gather(*tasks) 是一个可等待对象,等待 tasks 列表中的所有 Task 完成
        results = await asyncio.gather(*tasks)
        # await asyncio.gather 会暂停当前协程 (main_async_downloads)
        # 事件循环会调度 tasks 列表中的各个 download_async 协程并行运行
        # 当所有 download_async 协程都完成后,asyncio.gather 完成,返回一个包含所有协程返回值的列表
        # main_async_downloads 协程从 await asyncio.gather 处恢复执行

    # 退出 async with aiohttp.ClientSession() as session: 块
    # session.__aexit__() 被调用,清理 session 资源

    end_time = time.time() # 记录结束时间
    print(f"--- Asynchronous Downloads Finished in {
              end_time - start_time:.2f} seconds ---")
    # 打印总耗时
    # 总耗时应该接近单个下载耗时最长的那个 (如果有多个 URL),因为是并发执行的

    print("
Download Results:")
    for url, length in results:
        # 遍历 asyncio.gather 返回的结果列表
        if length is not None:
            print(f"  {
              url}: {
              length} bytes")
        else:
            print(f"  {
              url}: Failed to download")

# 准备要下载的 URL 列表
urls_to_download = [
    "http://www.example.com",
    "http://www.google.com",
    "http://www.bing.com",
    "http://www.python.org",
    "http://www.wikipedia.org"
    # 可以添加更多 URL 来测试并发效果
    # "http://nonexistent.example.com", # 测试错误处理
    # "http://httpbin.org/delay/5" # 测试单个长时间任务
]

# 运行异步主协程
asyncio.run(main_async_downloads(urls_to_download))
# 使用 asyncio.run 启动事件循环并运行 main_async_downloads 协程

# 输出示例 (顺序可能因网络情况和服务器响应速度而异):
# --- Starting Asynchronous Downloads ---
# Asynchronous: Downloading http://www.example.com...
# Asynchronous: Downloading http://www.google.com...
# Asynchronous: Downloading http://www.bing.com...
# Asynchronous: Downloading http://www.python.org...
# Asynchronous: Downloading http://www.wikipedia.org...
# Asynchronous: Finished downloading http://www.example.com, status: 200, content length: 1256
# Asynchronous: Finished downloading http://www.bing.com, status: 200, content length: 103979
# Asynchronous: Finished downloading http://www.wikipedia.org, status: 200, content length: 82857
# Asynchronous: Finished downloading http://www.python.org, status: 200, content length: 51639
# Asynchronous: Finished downloading http://www.google.com, status: 200, content length: 15216
# --- Asynchronous Downloads Finished in 0.xxx seconds --- # 这个时间应该显著小于同步版本
#
# Download Results:
#   http://www.example.com: 1256 bytes
#   http://www.google.com: 15216 bytes
#   http://www.bing.com: 103979 bytes
#   http://www.python.org: 51639 bytes
#   http://www.wikipedia.org: 82857 bytes

这个企业级示例生动地展示了 asyncioaiohttp 在处理多个网络请求时的优势。通过使用异步编程,程序能够在等待一个请求的响应时,切换去处理其他请求,从而在高并发 I/O 场景下实现比同步或多线程更高的效率。aiohttp.ClientSession 用于管理连接,async with 语法确保资源的正确获取和释放,asyncio.create_taskasyncio.gather 用于方便地启动和等待多个异步任务。

6.6 错误处理与取消

在异步编程中处理错误和任务取消需要特别注意。

错误处理: 协程中发生的异常会传播到等待它的地方(通过 awaitasyncio.gather)。如果一个 Task 完成时发生了异常,可以通过 Task 对象的 exception() 方法获取异常,或者在调用 result() 时重新抛出异常。asyncio.gather 在默认情况下,如果其中一个任务发生异常,会立即取消其他正在运行的任务,并重新抛出第一个发生的异常。可以通过 return_exceptions=True 参数改变 asyncio.gather 的行为,使其不取消其他任务,而是将所有任务的结果或异常都收集起来返回。
任务取消: 可以通过 Task 对象的 cancel() 方法请求取消一个任务。当一个协程被请求取消时,它会在下一次 await 可取消的操作(如 asyncio.sleep, 异步 I/O 操作)时,在 await 表达式处抛出一个 asyncio.CancelledError 异常。协程可以通过捕获 asyncio.CancelledError 来进行清理操作,但必须小心处理,确保不会抑制异常或在处理取消时执行阻塞操作。

import asyncio
import time

async def cancellable_task(name, delay):
    # 一个可以被取消的协程
    print(f"Task {
              name}: Starting, will run for {
              delay}s...")
    try:
        # await 一个可以被取消的操作
        await asyncio.sleep(delay)
        # 如果在 sleep 期间被取消,会在 await asyncio.sleep(delay) 处抛出 CancelledError
        print(f"Task {
              name}: Finished successfully after {
              delay}s.")
        return f"{
              name} finished"
    except asyncio.CancelledError:
        # 捕获 CancelledError,进行清理
        print(f"Task {
              name}: Caught CancelledError. Performing cleanup...")
        # 模拟清理
        await asyncio.sleep(0.01) # 异步清理,不能在这里做阻塞操作
        print(f"Task {
              name}: Cleanup finished.")
        # 在捕获 CancelledError 后,通常应该重新抛出它,除非你真的想要抑制取消
        raise # 重新抛出 CancelledError
    except Exception as e:
        # 捕获其他异常
        print(f"Task {
              name}: Caught other exception: {
              type(e).__name__} - {
              e}")
        raise # 重新抛出其他异常
    finally:
        # finally 块总会被执行,无论成功、取消还是异常
        print(f"Task {
              name}: Exiting.")


async def error_prone_task(name, delay, should_raise):
    # 一个可能会抛出异常的协程
    print(f"Error Task {
              name}: Starting, will run for {
              delay}s...")
    try:
        await asyncio.sleep(delay)
        if should_raise:
            print(f"Error Task {
              name}: About to raise exception.")
            raise ValueError(f"Error in {
              name}")
            # 抛出一个 ValueError
        print(f"Error Task {
              name}: Finished successfully after {
              delay}s.")
        return f"{
              name} finished"
    except Exception as e:
        print(f"Error Task {
              name}: Caught exception internally: {
              type(e).__name__} - {
              e}")
        raise # 重新抛出异常


async def main_error_cancellation():
    print("--- Error Handling and Cancellation Example ---")

    # --- 示例 1: 正常执行和异常处理 ---
    print("
--- Normal execution and error propagation ---")
    task_ok = asyncio.create_task(error_prone_task("OK", 0.1, False))
    task_err = asyncio.create_task(error_prone_task("Error", 0.2, True))

    # 使用 gather 默认行为 (取消其他任务并重新抛出第一个异常)
    try:
        results = await asyncio.gather(task_ok, task_err)
        # await gather 会等待所有任务,如果 task_err 抛出异常,gather 会取消 task_ok 并抛出异常
        print(f"Gather results (should not reach here if exception): {
              results}")
    except ValueError as e:
        print(f"Main coroutine: Caught expected exception from gather: {
              e}")

    # 检查任务状态
    print(f"Task OK done?: {
              task_ok.done()}, cancelled?: {
              task_ok.cancelled()}") # 可能会被取消
    print(f"Task Error done?: {
              task_err.done()}, cancelled?: {
              task_err.cancelled()}") # 应该完成并有异常

    # --- 示例 2: 使用 return_exceptions=True ---
    print("
--- Using gather with return_exceptions=True ---")
    task_ok_re = asyncio.create_task(error_prone_task("OK_RE", 0.1, False))
    task_err_re = asyncio.create_task(error_prone_task("Error_RE", 0.2, True))

    # 使用 gather(return_exceptions=True)
    results_re = await asyncio.gather(task_ok_re, task_err_re, return_exceptions=True)
    # return_exceptions=True 时,gather 不会取消其他任务,也不会抛出异常
    # 而是将所有结果或异常对象放入结果列表中返回

    print(f"Gather results (with return_exceptions=True): {
              results_re}")
    # 结果列表中会包含正常结果和异常对象

    # 检查任务状态
    print(f"Task OK_RE done?: {
              task_ok_re.done()}, cancelled?: {
              task_ok_re.cancelled()}") # 应该正常完成
    print(f"Task Error_RE done?: {
              task_err_re.done()}, cancelled?: {
              task_err_re.cancelled()}") # 应该完成并有异常

    # --- 示例 3: 任务取消 ---
    print("
--- Task Cancellation ---")
    task_cancellable = asyncio.create_task(cancellable_task("CancelMe", 5))
    task_short = asyncio.create_task(cancellable_task("ShortTask", 1))

    print("Main coroutine: Waiting for a bit, then cancelling 'CancelMe'...")
    await asyncio.sleep(0.5) # 等待 0.5 秒
    # 在 task_cancellable 的 asyncio.sleep(5) 完成之前取消它
    cancelled = task_cancellable.cancel() # 请求取消 Task "CancelMe"
    print(f"Cancellation requested for 'CancelMe': {
              cancelled}") # cancel() 返回 True 表示请求成功

    print("Main coroutine: Waiting for tasks to finish after cancellation request...")
    # 等待所有任务完成 (包括被取消的任务)
    results_cancel = await asyncio.gather(task_cancellable, task_short, return_exceptions=True)
    # 使用 return_exceptions=True 方便查看被取消任务的结果 (会返回 CancelledError)

    print(f"Gather results (after cancellation): {
              results_cancel}")
    # 结果列表中,被取消的任务会对应一个 CancelledError 异常

    # 检查任务状态
    print(f"Task CancelMe done?: {
              task_cancellable.done()}, cancelled?: {
              task_cancellable.cancelled()}, exception: {
              task_cancellable.exception()}")
    # 被取消的任务 done() 会是 True,cancelled() 会是 True,exception() 会返回 CancelledError
    print(f"Task ShortTask done?: {
              task_short.done()}, cancelled?: {
              task_short.cancelled()}, exception: {
              task_short.exception()}")
    # 未被取消的任务正常完成

    print("--- Error Handling and Cancellation Example Finished ---")

# 运行主协程
asyncio.run(main_error_cancellation())

# 输出示例 (注意 CancelledError 的打印和捕获):
# --- Error Handling and Cancellation Example ---
#
# --- Normal execution and error propagation ---
# Error Task OK: Starting, will run for 0.1s...
# Error Task Error: Starting, will run for 0.2s...
# Error Task OK: Finished successfully after 0.1s.
# Error Task Error: About to raise exception.
# Error Task Error: Caught exception internally: ValueError - Error in Error
# Main coroutine: Caught expected exception from gather: Error in Error
# Task OK done?: True, cancelled?: True # task_ok 被 gather 取消了
# Task Error done?: True, cancelled?: False # task_err 正常完成并抛出异常
#
# --- Using gather with return_exceptions=True ---
# Error Task OK_RE: Starting, will run for 0.1s...
# Error Task Error_RE: Starting, will run for 0.2s...
# Error Task OK_RE: Finished successfully after 0.1s.
# Error Task Error_RE: About to raise exception.
# Error Task Error_RE: Caught exception internally: ValueError - Error in Error_RE
# Gather results (with return_exceptions=True): ['OK_RE finished', ValueError('Error in Error_RE')] # 结果列表包含正常结果和异常对象
# Task OK_RE done?: True, cancelled?: False # 正常完成
# Task Error_RE done?: True, cancelled?: False # 正常完成并有异常
#
# --- Task Cancellation ---
# Task CancelMe: Starting, will run for 5s...
# Task ShortTask: Starting, will run for 1s...
# Main coroutine: Waiting for a bit, then cancelling 'CancelMe'...
# Cancellation requested for 'CancelMe': True
# Main coroutine: Waiting for tasks to finish after cancellation request...
# Task ShortTask: Finished after 1s.
# Task ShortTask: Exiting.
# Task CancelMe: Caught CancelledError. Performing cleanup...
# Task CancelMe: Cleanup finished.
# Task CancelMe: Exiting.
# Gather results (after cancellation): [CancelledError(), 'ShortTask finished'] # 被取消的任务结果是 CancelledError
# Task CancelMe done?: True, cancelled?: True, exception: CancelledError() # 被取消
# Task ShortTask done?: True, cancelled?: False, exception: None # 正常完成
# --- Error Handling and Cancellation Example Finished ---

在异步编程中,异常处理和任务取消是重要的方面。理解异常如何传播、asyncio.gather 的异常处理行为以及如何正确地捕获和处理 CancelledError 对于构建健壮的异步应用至关重要。在协程中进行清理时,务必使用异步操作(如 await asyncio.sleep())而不是阻塞操作,以避免阻塞整个事件循环。

6.7 将同步/阻塞代码集成到 asyncio

asyncio 的事件循环不能被阻塞。如果在协程中直接调用同步的、可能阻塞的函数(如 time.sleep(), 数据库同步驱动的调用,某些文件 I/O),会导致整个事件循环停止响应,直到阻塞操作完成,从而失去了异步的优势。

为了在 asyncio 中运行阻塞代码,需要将其卸载到单独的线程或进程池中,让事件循环不受影响。asyncio 提供了 run_in_executor() 方法来实现这一点。

import asyncio
import time
import requests # 使用同步的 requests 库进行演示
import concurrent.futures # concurrent.futures 模块提供线程池和进程池

# 定义一个同步的、阻塞的函数
def blocking_io_function(url):
    # 这是一个阻塞的 I/O 函数 (使用同步 requests)
    print(f"Blocking function: Downloading {
              url}...")
    try:
        response = requests.get(url, timeout=5) # 同步阻塞的网络请求
        response.raise_for_status()
        content_length = len(response.text)
        print(f"Blocking function: Finished downloading {
              url}, content length: {
              content_length}")
        return url, content_length
    except requests.exceptions.RequestException as e:
        print(f"Blocking function: Error downloading {
              url}: {
              type(e).__name__} - {
              e}")
        return url, None


async def main_run_in_executor():
    print("--- Running Blocking Code in Executor Example ---")

    loop = asyncio.get_running_loop() # 获取当前事件循环

    urls = [
        "http://www.example.com",
        "http://www.google.com",
        "http://www.bing.com"
    ]

    # --- 使用默认的 ThreadPoolExecutor ---
    # asyncio 的事件循环默认有一个内置的 ThreadPoolExecutor
    print("
--- Using default ThreadPoolExecutor ---")
    start_time = time.time()
    tasks = []
    for url in urls:
        # 将阻塞函数提交到默认的 ThreadPoolExecutor 中运行
        # loop.run_in_executor(executor, func, *args)
        # executor=None 表示使用默认的 executor (ThreadPoolExecutor)
        task = loop.run_in_executor(None, blocking_io_function, url)
        # run_in_executor 返回一个 Future 对象 (concurrent.futures.Future 的子类),它是可等待的
        tasks.append(task)

    # 等待所有提交的任务完成
    results = await asyncio.gather(*tasks)
    end_time = time.time()

    print("All blocking tasks finished.")
    print(f"Results: {
              results}")
    print(f"Execution time using default ThreadPoolExecutor: {
              end_time - start_time:.4f} seconds.")
    # 总耗时应该显著小于所有下载时间之和,因为是在线程池中并发执行

    # --- 使用自定义 ThreadPoolExecutor ---
    print("
--- Using custom ThreadPoolExecutor ---")
    start_time = time.time()
    # 可以创建一个自定义的 ThreadPoolExecutor,并指定最大线程数
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_pool:
        # 使用 with 语句管理线程池,确保其在退出时被关闭
        tasks_custom = []
        for url in urls:
            # 将阻塞函数提交到自定义线程池中运行
            task = loop.run_in_executor(thread_pool, blocking_io_function, url)
            # run_in_executor(executor, func, *args) 的第一个参数是指定的 executor 对象
            tasks_custom.append(task)

        # 等待所有任务完成
        results_custom = await asyncio.gather(*tasks_custom)

    end_time = time.time()
    print("All blocking tasks finished (custom pool).")
    print(f"Results (custom pool): {
              results_custom}")
    print(f"Execution time using custom ThreadPoolExecutor: {
              end_time - start_time:.4f} seconds.")
    # 如果 max_workers 设置较小,可能会看到并发度受限的影响

    # --- 使用 ProcessPoolExecutor (适用于 CPU 密集型阻塞任务) ---
    # 进程池适用于 CPU 密集型的阻塞函数,因为 GIL 不会成为问题
    # 注意:进程池的创建和管理开销比线程池大
    print("
--- Using ProcessPoolExecutor ---")
    # ProcessPoolExecutor 只能在主程序入口点创建或在 __main__ 块中创建
    # 为了简化演示,这里假设在允许的地方创建
    # with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as process_pool:
    #     tasks_process = []
    #     # 假设有一个 CPU 密集型的阻塞函数 cpu_blocking_function
    #     # for data in data_list:
    #     #     task = loop.run_in_executor(process_pool, cpu_blocking_function, data)
    #     #     tasks_process.append(task)
    #     # results_process = await asyncio.gather(*tasks_process)
    #     print("ProcessPoolExecutor usage is conceptualized here due to execution context.")
    #     # 在实际应用中,如果 blocking_io_function 是 CPU 密集型的,可以这样使用

# 运行主协程
asyncio.run(main_run_in_executor())

# 输出示例:
# --- Running Blocking Code in Executor Example ---
#
# --- Using default ThreadPoolExecutor ---
# Submitting tasks to default ThreadPoolExecutor...
# Blocking function: Downloading http://www.example.com...
# Blocking function: Downloading http://www.google.com...
# Blocking function: Downloading http://www.bing.com...
# Blocking function: Finished downloading http://www.example.com, content length: 1256
# Blocking function: Finished downloading http://www.bing.com, content length: 103979
# Blocking function: Finished downloading http://www.google.com, content length: 15216
# All blocking tasks finished.
# Results: [('http://www.example.com', 1256), ('http://www.google.com', 15216), ('http://www.bing.com', 103979)]
# Execution time using default ThreadPoolExecutor: 0.xxx seconds. # 时间取决于最慢的下载
#
# --- Using custom ThreadPoolExecutor ---
# Submitting tasks to custom ThreadPoolExecutor...
# Blocking function: Downloading http://www.example.com...
# Blocking function: Downloading http://www.google.com...
# Blocking function: Downloading http://www.bing.com...
# Blocking function: Finished downloading http://www.example.com, content length: 1256
# Blocking function: Finished downloading http://www.bing.com, content length: 103979
# Blocking function: Finished downloading http://www.google.com, content length: 15216
# All blocking tasks finished (custom pool).
# Results (custom pool): [('http://www.example.com', 1256), ('http://www.google.com', 15216), ('http://www.bing.com', 103979)]
# Execution time using custom ThreadPoolExecutor: 0.xxx seconds. # 时间取决于最慢的下载
#
# --- Using ProcessPoolExecutor ---
# ProcessPoolExecutor usage is conceptualized here due to execution context.
# --- Running Blocking Code in Executor Example Finished ---

loop.run_in_executor() 是在 asyncio 中处理阻塞代码的标准方法。它将阻塞函数提交给一个 concurrent.futures.Executor(可以是线程池或进程池)在另一个线程或进程中运行,而不会阻塞事件循环。当阻塞函数执行完成后,其结果或异常会被返回到事件循环中相应的 Future 对象,并通知等待的协程恢复执行。对于 I/O 阻塞,通常使用线程池;对于 CPU 阻塞且不受 GIL 限制的函数,可以使用进程池。

6.8 企业级应用:构建一个异步的 Web 服务

异步编程在构建高性能 Web 服务中非常流行,特别是那些需要处理大量并发连接和 I/O 操作的服务(如 API 网关、微服务)。流行的 Python 异步 Web 框架包括 FastAPI, Starlette, aiohttp 等。

这里我们使用 aiohttp 构建一个简单的异步 Web 服务器作为示例。

首先,确保安装 aiohttppip install aiohttp

import asyncio
import aiohttp
from aiohttp import web # 导入 aiohttp.web 模块用于构建 Web 应用
import time
import random

# 定义一个简单的异步请求处理函数 (handler)
async def handle_hello(request):
    # 异步请求处理函数,接收一个 Request 对象
    print(f"Handler /hello: Received request from {
              request.remote}")
    # 打印请求信息 (请求来源 IP)

    # 模拟一个异步操作 (例如,异步数据库查询、异步第三方服务调用)
    delay = random.uniform(0.1, 0.5) # 随机生成一个延迟时间
    print(f"Handler /hello: Simulating async work for {
              delay:.2f}s...")
    await asyncio.sleep(delay) # 异步暂停,不阻塞其他请求的处理

    # 构建并返回一个异步响应
    name = request.match_info.get('name', "Anonymous") # 从 URL 参数或默认值获取 name
    text = "Hello, " + name + "!"
    print(f"Handler /hello: Sending response for {
              name}.")
    return web.Response(text=text) # 返回一个 web.Response 对象

# 定义另一个处理函数,模拟一个较慢的异步操作
async def handle_slow(request):
    # 模拟一个较慢的异步请求处理函数
    print(f"Handler /slow: Received request from {
              request.remote}")
    print("Handler /slow: Simulating slow async work for 2s...")
    await asyncio.sleep(2) # 模拟耗时 2 秒的异步操作
    print("Handler /slow: Finished slow async work.")
    return web.Response(text="This was slow!") # 返回响应


# 主函数,用于设置和运行 Web 应用
async def init_app():
    # 初始化 aiohttp Web 应用
    print("Initializing aiohttp web application...")
    app = web.Application() # 创建一个 Web Application 实例

    # 配置路由:将 URL 路径映射到对应的处理函数
    # app.router.add_get('/hello', handle_hello) # 匹配 GET /hello 请求
    # 匹配 GET /hello/{name} 或 GET /hello 请求
    # {name} 是一个 URL 路径参数
    app.router.add_get('/hello/{name}', handle_hello)
    app.router.add_get('/hello', handle_hello)
    app.router.add_get('/slow', handle_slow) # 匹配 GET /slow 请求

    # 可以添加其他路由和处理函数
    # app.router.add_post('/submit', handle_submit)
    # app.router.add_static('/static', 'path/to/static/files') # 添加静态文件服务

    print("Web application initialized.")
    return app # 返回配置好的 Web 应用实例

# --- 运行 Web 服务器 ---
if __name__ == '__main__':
    # 这是 Python 脚本的标准入口点,确保在直接运行时才执行服务器启动代码
    # 在企业级应用中,通常会通过其他方式(如 gunicorn 配合 aiohttp worker)来运行

    app = asyncio.get_event_loop().run_until_complete(init_app())
    # 在事件循环中运行 init_app 协程来初始化应用
    # get_event_loop() 获取当前线程的事件循环 (如果没有则创建一个)
    # run_until_complete() 运行协程直到完成

    print("Starting web server...")
    # 使用 aiohttp.web.run_app 启动 Web 服务器
    web.run_app(app, host='127.0.0.1', port=8080)
    # run_app 会阻塞,直到服务器停止 (例如,按下 Ctrl+C)
    # 它内部会管理事件循环和请求处理

    # 如果不使用 run_app,也可以手动创建和运行 Site 对象:
    # loop = asyncio.get_event_loop()
    # runner = web.AppRunner(app)
    # loop.run_until_complete(runner.setup())
    # site = web.TCPSite(runner, '127.0.0.1', 8080)
    # loop.run_until_complete(site.start())
    # print('Serving on http://127.0.0.1:8080/')
    # try:
    #     loop.run_forever()
    # except KeyboardInterrupt:
    #     pass
    # finally:
    #     loop.run_until_complete(runner.cleanup())
    #     loop.close()
    # 这种手动方式提供了更多的控制权

# 运行步骤:
# 1. 保存代码为 .py 文件 (例如, async_web_server.py)
# 2. 在命令行运行: python async_web_server.py
# 3. 在浏览器或使用 curl 访问:
#    http://127.0.0.1:8080/hello
#    http://127.0.0.1:8080/hello/Alice
#    http://127.0.0.1:8080/slow
# 尝试同时打开多个 /hello 或 /slow 页面,观察服务器日志,你会发现请求是并发处理的,
# 慢请求不会阻塞快请求的响应。

# 服务器输出示例:
# Initializing aiohttp web application...
# Web application initialized.
# Starting web server...
# ======== Running on http://127.0.0.1:8080 ========
# (Press CTRL+C to quit)
# Handler /hello: Received request from 127.0.0.1 # 收到 /hello 请求
# Handler /hello: Simulating async work for 0.35s... # 模拟异步工作 (不阻塞)
# Handler /slow: Received request from 127.0.0.1 # 同时收到 /slow 请求
# Handler /slow: Simulating slow async work for 2s... # 模拟慢速异步工作 (不阻塞)
# Handler /hello: Sending response for Anonymous. # /hello 请求先完成并发送响应
# Handler /slow: Finished slow async work. # /slow 请求后完成
# Handler /slow: Sending response for Anonymous. # /slow 请求发送响应

这个企业级示例展示了如何使用 aiohttp.web 构建一个基本的异步 Web 服务器。请求处理函数是协程(使用 async def 定义),可以在其中使用 await 进行异步 I/O 操作,而不会阻塞其他请求的处理。aiohttp.web.run_app 负责创建和管理事件循环,以及处理底层的网络通信和请求路由。异步 Web 框架是构建高并发、高吞吐量 Web 服务的首选工具。

第七章:深入理解元类 (Metaclasses):掌控类的创建过程

在 Python 中,”一切皆对象”是一个核心概念。数字、字符串、列表、函数,甚至类本身,都是对象。既然类是对象,那么类是如何创建的呢?创建类的“类”就是元类(Metaclass)。默认情况下,Python 中所有类都是由内置的 type 元类创建的。理解元类意味着理解类是如何动态生成和定制的,这为编写非常灵活和强大的框架代码提供了可能性。

7.1 什么是元类? (What are Metaclasses?)

我们知道,通过 class 关键字可以定义一个类:

# 使用 class 关键字定义一个普通的类
class MyClass:
    # 这是一个类定义
    pass
    # 类体为空

# 创建 MyClass 类的实例
my_object = MyClass()
# my_object 是 MyClass 的一个实例

# 查看对象的类型
print(type(my_object))
# 输出:<class '__main__.MyClass'>
# type(my_object) 返回创建 my_object 的类,即 MyClass

# 查看类的类型
print(type(MyClass))
# 输出:<class 'type'>
# type(MyClass) 返回创建 MyClass 类的“类型”,即元类 type
# 这表明 MyClass 类本身是一个对象,它的类型是 type

这个简单的例子说明了:对象是类的实例,而类是元类的实例。默认的元类是 type

你可以将元类想象成一个“类工厂”,它负责接收类的定义(类名、基类、属性和方法),并根据这些信息创建出最终的类对象。当我们使用 class 关键字时,Python 在幕后就是调用了元类来创建这个类。

7.2 使用 type() 动态创建类

内置的 type() 函数可以有两种用法:

type(object): 返回对象的类型(如上面的例子所示)。
type(name, bases, dict): 动态创建一个新的类。这三个参数分别表示:

name: 类的名称(字符串)。
bases: 类的基类元组。
dict: 类的命名空间字典,包含了类的属性和方法(键是属性/方法名,值是属性/方法对象)。

# 使用 type() 函数动态创建一个类,等同于 class MyDynamicClass: pass
MyDynamicClass = type('MyDynamicClass', (), {
            })
# 第一个参数 'MyDynamicClass' 是类名
# 第二个参数 () 是基类元组 (没有基类,所以是空元组)
# 第三个参数 {} 是类的命名空间字典 (没有属性或方法,所以是空字典)

# 检查创建的类
print(MyDynamicClass)
# 输出:<class '__main__.MyDynamicClass'>

# 创建动态创建的类的实例
my_dynamic_object = MyDynamicClass()
print(my_dynamic_object)
# 输出:<__main__.MyDynamicClass object at 0x...>

# 使用 type() 动态创建一个带有属性和方法的类
def dynamic_greet(self, name):
    # 定义一个函数,将作为类的方法
    # self 参数是方法的惯例,指向实例本身
    return f"Hello, {
              name} from dynamic class!"

DynamicClassWithMembers = type('DynamicClassWithMembers', (object,), {
            
    # 类名 'DynamicClassWithMembers'
    # 基类元组 (object,),表示继承自 object
    'version': '1.0', # 类属性 'version'
    'greet': dynamic_greet # 方法 'greet',对应上面的函数 dynamic_greet
})

# 检查动态创建的类
print(DynamicClassWithMembers)
# 输出:<class '__main__.DynamicClassWithMembers'>

# 访问类属性
print(DynamicClassWithMembers.version)
# 输出:1.0

# 创建实例
dynamic_instance = DynamicClassWithMembers()

# 调用方法
print(dynamic_instance.greet("Python"))
# 输出:Hello, Python from dynamic class!

通过 type(name, bases, dict) 动态创建类,我们可以更清晰地看到类是如何由其名称、基类和命名空间构成的。这正是元类在幕后所做的事情:接收这些信息,然后创建一个类对象。

7.3 自定义元类:__new____init__ 的魔力

要定义一个自定义元类,需要创建一个继承自 type 的类。自定义元类的 __new__ 方法负责创建类对象本身,而 __init__ 方法负责初始化这个新创建的类对象。

当 Python 遇到一个类定义(使用 class 关键字),并且这个类指定了一个自定义元类时,会发生以下步骤:

Python 首先找到指定的元类(如果没指定,默认是 type)。
Python 调用元类的 __new__(mcs, name, bases, dict) 方法。

mcs: 元类本身(通常称为 metaclass)。
name: 类的名称(字符串)。
bases: 类的基类元组。
dict: 类的命名空间字典,包含了类体中定义的所有属性和方法。
__new__ 负责分配内存并创建一个新的类对象,通常通过调用基类(即 type)的 __new__ 来完成,例如 super().__new__(mcs, name, bases, dict)type.__new__(mcs, name, bases, dict)__new__ 必须返回新创建的类对象。

Python 调用元类的 __init__(cls, name, bases, dict) 方法。

cls: 刚刚由 __new__ 创建的新类对象。
name, bases, dict: 与 __new__ 相同的参数。
__init__ 负责对新创建的类对象进行初始化设置,它不返回任何值。

最终,class 语句的求值结果就是 __new__ 返回并经过 __init__ 初始化的新类对象。

# 定义一个自定义元类
class MyMetaclass(type):
    # 自定义元类需要继承自 type

    def __new__(mcs, name, bases, dict):
        # __new__ 方法在类创建时被调用,负责创建类对象本身
        # mcs: 当前正在定义的元类 (MyMetaclass)
        # name: 正在创建的类的名称 (字符串)
        # bases: 正在创建的类的基类元组
        # dict: 正在创建的类的命名空间字典 (类体中的属性和方法)
        print(f"--- In MyMetaclass.__new__ for class {
              name} ---")
        print(f"  Name: {
              name}")
        print(f"  Bases: {
              bases}")
        print(f"  Dict: {
              dict}")

        # 在类创建之前可以修改或检查 dict
        # 例如,可以强制所有类名以 MyClass_ 开头
        # if not name.startswith('MyClass_'):
        #     raise TypeError("Class name must start with 'MyClass_'")

        # 可以为类添加新的属性或方法
        dict['metaclass_added_attribute'] = "This was added by the metaclass!"
        # 添加一个类属性

        def metaclass_added_method(self):
            # 定义一个方法,将作为类的方法
            return "This method was added by the metaclass!"
        dict['metaclass_added_method'] = metaclass_added_method
        # 添加一个类方法

        # 调用父类 (type) 的 __new__ 方法来真正创建类对象
        cls = super().__new__(mcs, name, bases, dict)
        # 调用 type 的 __new__,传入元类 mcs,类名 name,基类 bases,命名空间 dict
        # type.__new__ 会返回实际的类对象

        print(f"--- Finished MyMetaclass.__new__, created class: {
              cls} ---")
        # 打印创建的类对象
        return cls
        # __new__ 必须返回创建的类对象

    def __init__(cls, name, bases, dict):
        # __init__ 方法在类对象创建后被调用,用于初始化类对象
        # cls: 刚刚由 __new__ 创建的新类对象
        # name, bases, dict: 与 __new__ 相同的参数
        print(f"--- In MyMetaclass.__init__ for class {
              name} ---")
        # 在这里可以对创建好的类对象 cls 进行进一步的设置或操作
        # 例如,可以在类上注册一些信息
        cls.initialization_status = "Initialized by metaclass"
        # 添加一个初始化状态属性

        # 调用父类 (type) 的 __init__ 方法 (尽管 type 的 __init__ 通常不做太多,但最佳实践是调用)
        super().__init__(cls, name, bases, dict)

        print(f"--- Finished MyMetaclass.__init__ for class {
              name} ---")
        # __init__ 方法不返回任何值

# 使用自定义元类来定义一个类
class MyRegularClass(metaclass=MyMetaclass):
    # 在类定义时,通过 metaclass 参数指定使用的元类
    # 这意味着 MyRegularClass 将由 MyMetaclass 创建,而不是默认的 type

    def __init__(self, value):
        # 这是一个普通的实例构造函数
        self.instance_value = value

    def original_method(self):
        # 类体中定义的原始方法
        return f"Original method called with instance value: {
              self.instance_value}"

# 当 Python 解释器处理上面的 class MyRegularClass(metaclass=MyMetaclass): 块时,
# 它会调用 MyMetaclass 的 __new__ 和 __init__ 方法来创建 MyRegularClass 类对象。

# 检查 MyRegularClass 类
print("
--- Checking MyRegularClass after creation ---")
print(MyRegularClass)
# 输出:<class '__main__.MyRegularClass'>

# 访问元类添加的类属性
print(MyRegularClass.metaclass_added_attribute)
# 输出:This was added by the metaclass!

# 访问元类添加的初始化状态属性
print(MyRegularClass.initialization_status)
# 输出:Initialized by metaclass

# 创建 MyRegularClass 的实例
obj = MyRegularClass(100)

# 调用元类添加的方法
print(obj.metaclass_added_method())
# 输出:This method was added by the metaclass!

# 调用类体中定义的原始方法
print(obj.original_method())
# 输出:Original method called with instance value: 100

# 检查 MyRegularClass 的类型 (它的元类)
print(type(MyRegularClass))
# 输出:<class '__main__.MyMetaclass'>
# 这证实了 MyRegularClass 是 MyMetaclass 的实例

# 检查 MyMetaclass 的类型 (元类也是对象,它的元类是 type)
print(type(MyMetaclass))
# 输出:<class 'type'>
# MyMetaclass 是 type 的实例

这个例子详细展示了自定义元类的工作流程。通过重写 __new____init__,元类可以在类创建之前(在 __new__ 中)检查、修改或向类的命名空间字典 dict 中添加成员,以及在类创建之后(在 __init__ 中)进一步配置类对象。这使得元类能够影响所有使用它的类。

7.4 元类的常见用例

元类提供了一种在类定义阶段就修改或增强类的能力。虽然元类功能强大,但不易理解,应谨慎使用。常见的用例包括:

自动注册类: 元类可以在创建类时,自动将类注册到一个中心注册表中,用于构建插件系统或服务查找。
强制实现特定接口/模式: 元类可以在类创建时检查类体,确保子类实现了父类(或元类)规定的方法或属性,类似于比抽象基类更严格的检查。
自动添加方法或属性: 根据某种规则或类定义的信息,元类可以自动为类添加方法、属性或特殊方法(如根据类属性自动生成 __repr__)。
实现领域特定语言 (DSL): 元类可以用于构建更具表达力的语法或结构,使得类定义更符合特定领域的需求。

7.5 企业级案例:使用元类实现插件注册系统

在一个大型软件系统中,常常需要一个插件或模块系统,允许开发者轻松地添加新的功能而无需修改核心代码。一个常见的模式是将所有可用的插件在启动时进行注册。元类可以自动完成这个注册过程。

import inspect # 导入 inspect 模块,用于检查对象 (例如,判断是否是类)

# 定义一个自定义元类,用于插件注册
class PluginMetaclass(type):
    # 这是一个用于插件注册的元类

    # 创建一个类级别的注册表字典,存储所有注册的插件类
    _registry = {
            }

    def __new__(mcs, name, bases, dict):
        # 在类创建时被调用
        cls = super().__new__(mcs, name, bases, dict)
        # 首先调用父类 (type) 的 __new__ 创建类对象

        # 检查是否是应该被注册的插件类
        # 我们规定只有直接继承自 BasePlugin (我们稍后定义) 的非抽象类才会被注册
        # issubclass(cls, BasePlugin) 检查 cls 是否是 BasePlugin 或其子类
        # hasattr(cls, '__abstractmethods__') 检查类是否是抽象类 (abc 模块内部会添加这个属性)
        if issubclass(cls, BasePlugin) and not hasattr(cls, '__abstractmethods__'):
             # 如果是 BasePlugin 的具体子类
            plugin_name = cls.__name__ # 使用类名作为插件名 (可以根据需求定制注册名逻辑)
            if plugin_name in mcs._registry:
                # 检查插件名是否已经存在,避免重复注册或冲突
                raise TypeError(f"Plugin '{
              plugin_name}' already registered!")
            mcs._registry[plugin_name] = cls # 将类添加到注册表中
            print(f"Plugin '{
              plugin_name}' registered.")
            # 打印注册信息

        return cls # 返回创建的类对象

    @classmethod
    # 定义一个类方法,用于获取所有注册的插件
    def get_plugins(cls):
        # cls 在这里是 PluginMetaclass 类本身
        return cls._registry.copy()
        # 返回注册表的副本,避免外部修改注册表


# 定义一个基础插件类,所有具体插件都应该继承它
class BasePlugin(metaclass=PluginMetaclass):
    # BasePlugin 使用 PluginMetaclass 作为元类
    # BasePlugin 本身不会被注册 (因为它是 PluginMetaclass 的直接子类,但我们可以设计成抽象类)
    # 标记为抽象类,确保它不会被实例化,只有它的具体子类才会被注册和使用
    import abc
    __metaclass__ = PluginMetaclass # 在 Python 2/3 兼容性时可能需要
    # 通常在 Python 3 中,metaclass=... 语法更常用且推荐

    @abc.abstractmethod
    def execute(self):
        # 定义一个抽象方法,所有插件都必须实现
        pass

    def configure(self, config):
        # 定义一个可选的配置方法
        print(f"Plugin {
              self.__class__.__name__} configuring with {
              config}")
        self.config = config

# --- 定义具体的插件类 ---
# 这些类会自动被 PluginMetaclass 注册

class FileProcessorPlugin(BasePlugin):
    # 文件处理插件,继承自 BasePlugin,使用 PluginMetaclass 作为元类
    # 它会自动被注册

    def execute(self):
        # 实现抽象方法 execute
        print(f"Executing FileProcessorPlugin with config: {
              getattr(self, 'config', 'No Config')}")
        # 模拟文件处理逻辑

class NetworkSenderPlugin(BasePlugin):
    # 网络发送插件,继承自 BasePlugin,使用 PluginMetaclass 作为元类
    # 它会自动被注册

    def execute(self):
        # 实现抽象方法 execute
        print(f"Executing NetworkSenderPlugin with config: {
              getattr(self, 'config', 'No Config')}")
        # 模拟网络发送逻辑

# 定义一个不应该被注册的普通类,验证元类是否正确判断
class SomeOtherClass:
    pass
    # 这个类没有继承 BasePlugin,所以不会被 PluginMetaclass 注册

# --- 模拟插件系统的使用 ---
print("
--- Using the Plugin System ---")

# 在定义了插件类之后,插件元类已经完成了自动注册
# 我们可以通过元类访问注册表
available_plugins = PluginMetaclass.get_plugins()
# 调用元类的 get_plugins 类方法获取注册的插件字典

print("Available plugins:")
for name, cls in available_plugins.items():
    # 遍历注册表,打印插件名称和对应的类
    print(f"- {
              name}: {
              cls}")

# 模拟根据名称加载和使用插件
plugin_name_to_use = "FileProcessorPlugin"

if plugin_name_to_use in available_plugins:
    # 如果插件在注册表中存在
    plugin_class = available_plugins[plugin_name_to_use]
    # 获取对应的插件类

    # 创建插件实例
    plugin_instance = plugin_class()
    # 实例化插件类

    # 配置插件 (如果需要)
    plugin_instance.configure({
            "input_path": "/data/raw", "output_path": "/data/processed"})

    # 执行插件功能
    plugin_instance.execute()
    # 调用插件实例的 execute 方法

else:
    print(f"Plugin '{
              plugin_name_to_use}' not found in registry.")


plugin_name_to_use_2 = "NetworkSenderPlugin"
if plugin_name_to_use_2 in available_plugins:
    plugin_class_2 = available_plugins[plugin_name_to_use_2]
    plugin_instance_2 = plugin_class_2()
    plugin_instance_2.configure({
            "server_url": "http://api.example.com/upload"})
    plugin_instance_2.execute()

# 检查未注册的类是否在注册表中
print(f"Is SomeOtherClass registered? {
              'SomeOtherClass' in available_plugins}")
# 输出:Is SomeOtherClass registered? False

# 输出示例:
# Plugin 'FileProcessorPlugin' registered.
# Plugin 'NetworkSenderPlugin' registered.
#
# --- Using the Plugin System ---
# Available plugins:
# - FileProcessorPlugin: <class '__main__.FileProcessorPlugin'>
# - NetworkSenderPlugin: <class '__main__.NetworkSenderPlugin'>
# Plugin FileProcessorPlugin configuring with {'input_path': '/data/raw', 'output_path': '/data/processed'}
# Executing FileProcessorPlugin with config: {'input_path': '/data/raw', 'output_path': '/data/processed'}
# Plugin NetworkSenderPlugin configuring with {'server_url': 'http://api.example.com/upload'}
# Executing NetworkSenderPlugin with config: {'server_url': 'http://api.example.com/upload'}
# Is SomeOtherClass registered? False

这个企业级案例展示了如何利用元类实现一个简单的插件注册系统。PluginMetaclass 在其 __new__ 方法中拦截所有继承自 BasePlugin 的类的创建过程。当检测到一个是 BasePlugin 的具体子类时,它会自动将这个类添加到 _registry 字典中。这样,所有实现了 BasePlugin 的具体插件类,在它们被定义时就会自动完成注册,而无需在类定义之后手动调用注册函数。这使得添加新插件变得非常方便,只需创建一个继承自 BasePlugin 的类即可。通过元类提供的在类创建阶段进行干预的能力,实现了这种优雅的自动注册机制。

7.6 元类使用的注意事项和替代方案

尽管元类非常强大,但它们会增加代码的复杂性和理解难度。在考虑使用元类之前,应首先评估是否存在更简单、更易于理解的替代方案:

类装饰器 (Class Decorators): 对于许多在类定义后修改类(例如,向类添加方法、属性,或注册类)的场景,类装饰器通常是比元类更简单、更 Pythonic 的选择。类装饰器是一个函数,它接收一个类对象作为参数,并返回一个新的(通常是修改后的)类对象。
继承和 mixin: 通过继承或使用 Mixin 类(提供特定功能的类,通常不独立实例化)可以实现代码重用和增强类的功能,这比元类更容易理解和维护。
工厂函数: 如果只是需要根据条件创建不同配置的类实例,工厂函数或工厂类通常是更好的选择。

代码示例:使用类装饰器实现插件注册 (替代元类)

用类装饰器来实现上面插件注册的功能。

import inspect

# 创建一个插件注册表 (全局变量或模块级别变量)
plugin_registry_decorator = {
            }

# 定义一个类装饰器,用于注册插件
def register_plugin_decorator(cls):
    # 这是一个类装饰器函数,接收类对象 cls 作为参数
    print(f"Decorator: Registering plugin class {
              cls.__name__}")
    plugin_name = cls.__name__ # 使用类名作为插件名

    if plugin_name in plugin_registry_decorator:
        raise TypeError(f"Plugin '{
              plugin_name}' already registered by decorator!")

    plugin_registry_decorator[plugin_name] = cls # 将类添加到注册表中
    print(f"Decorator: Plugin '{
              plugin_name}' registered.")

    return cls # 类装饰器必须返回类对象 (通常是原始类,也可以是修改后的类)


# 定义基础插件类 (不需要元类了)
class BasePluginDecorator:
    # 普通的基类

    # 可以使用抽象基类来强制实现方法
    import abc
    @abc.abstractmethod
    def execute(self):
        pass

    def configure(self, config):
        print(f"Plugin {
              self.__class__.__name__} configuring with {
              config}")
        self.config = config


# --- 定义具体的插件类,使用类装饰器注册 ---
# 使用 @register_plugin_decorator 装饰器注册这些类

@register_plugin_decorator
class FileProcessorPluginDecorator(BasePluginDecorator):
    # 这个类使用 @register_plugin_decorator 进行装饰和注册

    def execute(self):
        print(f"Executing FileProcessorPluginDecorator with config: {
              getattr(self, 'config', 'No Config')}")

@register_plugin_decorator
class NetworkSenderPluginDecorator(BasePluginDecorator):
    # 这个类也使用 @register_plugin_decorator 进行装饰和注册

    def execute(self):
        print(f"Executing NetworkSenderPluginDecorator with config: {
              getattr(self, 'config', 'No Config')}")

# 定义一个不注册的类
class SomeOtherClassDecorator:
    pass

# --- 模拟插件系统的使用 (通过装饰器的注册表) ---
print("
--- Using the Plugin System (Decorator Version) ---")

# 直接访问由装饰器填充的注册表
available_plugins_decorator = plugin_registry_decorator

print("Available plugins (Decorator Version):")
for name, cls in available_plugins_decorator.items():
    print(f"- {
              name}: {
              cls}")

# 模拟根据名称加载和使用插件
plugin_name_to_use_decorator = "FileProcessorPluginDecorator"

if plugin_name_to_use_decorator in available_plugins_decorator:
    plugin_class_decorator = available_plugins_decorator[plugin_name_to_use_decorator]
    plugin_instance_decorator = plugin_class_decorator()
    plugin_instance_decorator.configure({
            "input_path": "/data/raw_decorator"})
    plugin_instance_decorator.execute()

# 检查未注册的类是否在注册表中
print(f"Is SomeOtherClassDecorator registered? {
              'SomeOtherClassDecorator' in available_plugins_decorator}")

# 输出示例:
# Decorator: Registering plugin class FileProcessorPluginDecorator
# Decorator: Plugin 'FileProcessorPluginDecorator' registered.
# Decorator: Registering plugin class NetworkSenderPluginDecorator
# Decorator: Plugin 'NetworkSenderPluginDecorator' registered.
#
# --- Using the Plugin System (Decorator Version) ---
# Available plugins (Decorator Version):
# - FileProcessorPluginDecorator: <class '__main__.FileProcessorPluginDecorator'>
# - NetworkSenderPluginDecorator: <class '__main__.NetworkSenderPluginDecorator'>
# Plugin FileProcessorPluginDecorator configuring with {'input_path': '/data/raw_decorator'}
# Executing FileProcessorPluginDecorator with config: {'input_path': '/data/raw_decorator'}
# Is SomeOtherClassDecorator registered? False

这个类装饰器实现的插件注册系统与元类实现的功能类似,但其代码通常被认为更易于理解和维护。类装饰器在类定义完成后接收类对象,并在该对象上执行操作(如注册)。而元类在类对象创建过程中就介入。对于大多数需要在类定义阶段进行干预的需求,类装饰器是更推荐的选择,除非需要实现一些更底层或更复杂的类创建逻辑,这时才可能需要元类。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容