Python教程(三十一):多线程和多进程编程

今日目标

• 理解并发编程的基本概念

• 掌握Python多线程编程

• 学会多进程编程

• 了解线程同步和进程间通信

• 掌握并发编程的最佳实践

并发编程概述

并发编程允许程序同时执行多个任务,提高程序的性能和响应性:

多线程:共享内存空间,适合I/O密集型任务

多进程:独立内存空间,适合CPU密集型任务

异步编程:单线程非阻塞,适合高并发场景

并发 vs 并行

# 并发:多个任务交替执行
# 并行:多个任务同时执行(需要多核CPU)

import time
import threading
import multiprocessing

def cpu_bound_task(n):
    """CPU密集型任务"""
    result = 0
    for i in range(n):
        result += i * i
    return result

def io_bound_task(n):
    """I/O密集型任务"""
    time.sleep(n)  # 模拟I/O操作
    return f"任务{n}完成"

多线程编程

1. 创建线程

import threading
import time

def worker_function(name, delay):
    """工作函数"""
    print(f"线程 {name} 开始工作")
    for i in range(3):
        time.sleep(delay)
        print(f"线程 {name} 执行第 {i+1} 次任务")
    print(f"线程 {name} 工作完成")

# 方法1:使用threading.Thread
def create_thread_method1():
    """创建线程的方法1"""
    # 创建线程对象
    thread1 = threading.Thread(target=worker_function, args=("A", 1))
    thread2 = threading.Thread(target=worker_function, args=("B", 2))
    
    # 启动线程
    thread1.start()
    thread2.start()
    
    # 等待线程完成
    thread1.join()
    thread2.join()
    
    print("所有线程执行完成")

# 方法2:继承Thread类
class WorkerThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay
    
    def run(self):
        """线程执行的方法"""
        print(f"线程 {self.name} 开始工作")
        for i in range(3):
            time.sleep(self.delay)
            print(f"线程 {self.name} 执行第 {i+1} 次任务")
        print(f"线程 {self.name} 工作完成")

def create_thread_method2():
    """创建线程的方法2"""
    # 创建线程对象
    thread1 = WorkerThread("A", 1)
    thread2 = WorkerThread("B", 2)
    
    # 启动线程
    thread1.start()
    thread2.start()
    
    # 等待线程完成
    thread1.join()
    thread2.join()
    
    print("所有线程执行完成")

# 运行示例
print("=== 方法1:使用threading.Thread ===")
create_thread_method1()
print("
=== 方法2:继承Thread类 ===")
create_thread_method2()

2. 线程状态和属性

import threading
import time
import os

def thread_info_demo():
    """线程信息和状态演示"""
    
    def worker():
        print(f"线程 {threading.current_thread().name} 开始")
        print(f"线程ID: {threading.current_thread().ident}")
        print(f"是否为主线程: {threading.current_thread() is threading.main_thread()}")
        time.sleep(2)
        print(f"线程 {threading.current_thread().name} 结束")
    
    # 创建线程
    thread = threading.Thread(target=worker, name="工作线程")
    
    print(f"线程创建前状态: {thread.is_alive()}")
    print(f"线程名称: {thread.name}")
    print(f"是否为守护线程: {thread.daemon}")
    
    # 启动线程
    thread.start()
    print(f"线程启动后状态: {thread.is_alive()}")
    
    # 等待线程完成
    thread.join()
    print(f"线程结束后状态: {thread.is_alive()}")

# 守护线程示例
def daemon_thread_demo():
    """守护线程演示"""
    
    def daemon_worker():
        print("守护线程开始")
        for i in range(5):
            time.sleep(1)
            print(f"守护线程执行第 {i+1} 次")
        print("守护线程结束")
    
    def normal_worker():
        print("普通线程开始")
        time.sleep(3)
        print("普通线程结束")
    
    # 创建守护线程
    daemon_thread = threading.Thread(target=daemon_worker, daemon=True)
    normal_thread = threading.Thread(target=normal_worker)
    
    # 启动线程
    daemon_thread.start()
    normal_thread.start()
    
    # 等待普通线程完成
    normal_thread.join()
    print("主程序结束,守护线程会被自动终止")

# 运行示例
print("=== 线程信息演示 ===")
thread_info_demo()
print("
=== 守护线程演示 ===")
daemon_thread_demo()

3. 线程同步

import threading
import time
import random

# 1. 锁(Lock)
def lock_demo():
    """锁的使用演示"""
    counter = 0
    lock = threading.Lock()
    
    def increment():
        nonlocal counter
        for _ in range(1000):
            with lock:  # 使用上下文管理器
                counter += 1
    
    # 创建多个线程
    threads = []
    for i in range(5):
        thread = threading.Thread(target=increment)
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    print(f"最终计数器值: {counter}")

# 2. 可重入锁(RLock)
def rlock_demo():
    """可重入锁演示"""
    rlock = threading.RLock()
    
    def recursive_function(level):
        with rlock:
            print(f"进入递归函数,层级: {level}")
            if level > 0:
                recursive_function(level - 1)
            print(f"退出递归函数,层级: {level}")
    
    # 创建线程
    thread = threading.Thread(target=recursive_function, args=(3,))
    thread.start()
    thread.join()

# 3. 条件变量(Condition)
def condition_demo():
    """条件变量演示"""
    shared_data = []
    condition = threading.Condition()
    
    def producer():
        """生产者"""
        for i in range(5):
            with condition:
                shared_data.append(f"数据{i}")
                print(f"生产者添加: 数据{i}")
                condition.notify()  # 通知消费者
            time.sleep(1)
    
    def consumer():
        """消费者"""
        for i in range(5):
            with condition:
                while not shared_data:  # 等待数据
                    condition.wait()
                data = shared_data.pop(0)
                print(f"消费者处理: {data}")
    
    # 创建线程
    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)
    
    producer_thread.start()
    consumer_thread.start()
    
    producer_thread.join()
    consumer_thread.join()

# 4. 信号量(Semaphore)
def semaphore_demo():
    """信号量演示"""
    semaphore = threading.Semaphore(3)  # 最多允许3个线程同时访问
    
    def worker(worker_id):
        with semaphore:
            print(f"工作者 {worker_id} 获得资源")
            time.sleep(random.uniform(1, 3))
            print(f"工作者 {worker_id} 释放资源")
    
    # 创建多个线程
    threads = []
    for i in range(10):
        thread = threading.Thread(target=worker, args=(i,))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()

# 5. 事件(Event)
def event_demo():
    """事件演示"""
    event = threading.Event()
    
    def waiter():
        """等待事件的线程"""
        print("等待者开始等待事件")
        event.wait()  # 等待事件
        print("等待者收到事件,开始工作")
    
    def setter():
        """设置事件的线程"""
        time.sleep(3)
        print("设置者设置事件")
        event.set()  # 设置事件
    
    # 创建线程
    waiter_thread = threading.Thread(target=waiter)
    setter_thread = threading.Thread(target=setter)
    
    waiter_thread.start()
    setter_thread.start()
    
    waiter_thread.join()
    setter_thread.join()

# 运行同步示例
print("=== 锁演示 ===")
lock_demo()
print("
=== 可重入锁演示 ===")
rlock_demo()
print("
=== 条件变量演示 ===")
condition_demo()
print("
=== 信号量演示 ===")
semaphore_demo()
print("
=== 事件演示 ===")
event_demo()

4. 线程池

import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import queue

def thread_pool_basic():
    """线程池基本使用"""
    
    def task(name, delay):
        time.sleep(delay)
        return f"任务 {name} 完成"
    
    # 创建线程池
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 提交任务
        future1 = executor.submit(task, "A", 2)
        future2 = executor.submit(task, "B", 1)
        future3 = executor.submit(task, "C", 3)
        
        # 获取结果
        print(future1.result())
        print(future2.result())
        print(future3.result())

def thread_pool_map():
    """使用map方法"""
    
    def square(x):
        time.sleep(0.1)
        return x * x
    
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(square, numbers))
        print(f"结果: {results}")

def thread_pool_as_completed():
    """使用as_completed获取完成的任务"""
    
    def task(name, delay):
        time.sleep(delay)
        return f"任务 {name} 完成"
    
    tasks = [
        ("A", 3),
        ("B", 1),
        ("C", 2),
        ("D", 4)
    ]
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        # 提交所有任务
        future_to_task = {
            executor.submit(task, name, delay): name 
            for name, delay in tasks
        }
        
        # 按完成顺序获取结果
        for future in as_completed(future_to_task):
            task_name = future_to_task[future]
            try:
                result = future.result()
                print(f"{result} (任务 {task_name})")
            except Exception as e:
                print(f"任务 {task_name} 执行失败: {e}")

# 运行线程池示例
print("=== 线程池基本使用 ===")
thread_pool_basic()
print("
=== 使用map方法 ===")
thread_pool_map()
print("
=== 使用as_completed ===")
thread_pool_as_completed()

多进程编程

1. 创建进程

import multiprocessing
import time
import os

def process_worker(name, delay):
    """进程工作函数"""
    print(f"进程 {name} 开始工作 (PID: {os.getpid()})")
    for i in range(3):
        time.sleep(delay)
        print(f"进程 {name} 执行第 {i+1} 次任务")
    print(f"进程 {name} 工作完成")

def create_process_basic():
    """基本进程创建"""
    # 创建进程
    process1 = multiprocessing.Process(target=process_worker, args=("A", 1))
    process2 = multiprocessing.Process(target=process_worker, args=("B", 2))
    
    # 启动进程
    process1.start()
    process2.start()
    
    # 等待进程完成
    process1.join()
    process2.join()
    
    print("所有进程执行完成")

class WorkerProcess(multiprocessing.Process):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay
    
    def run(self):
        """进程执行的方法"""
        print(f"进程 {self.name} 开始工作 (PID: {os.getpid()})")
        for i in range(3):
            time.sleep(self.delay)
            print(f"进程 {self.name} 执行第 {i+1} 次任务")
        print(f"进程 {self.name} 工作完成")

def create_process_class():
    """使用类创建进程"""
    # 创建进程
    process1 = WorkerProcess("A", 1)
    process2 = WorkerProcess("B", 2)
    
    # 启动进程
    process1.start()
    process2.start()
    
    # 等待进程完成
    process1.join()
    process2.join()
    
    print("所有进程执行完成")

# 运行示例
print("=== 基本进程创建 ===")
create_process_basic()
print("
=== 使用类创建进程 ===")
create_process_class()

2. 进程间通信

import multiprocessing
import time
import random

# 1. 队列(Queue)
def queue_demo():
    """队列通信演示"""
    queue = multiprocessing.Queue()
    
    def producer(queue):
        """生产者进程"""
        for i in range(5):
            item = f"数据{i}"
            queue.put(item)
            print(f"生产者放入: {item}")
            time.sleep(1)
        queue.put(None)  # 结束信号
    
    def consumer(queue):
        """消费者进程"""
        while True:
            item = queue.get()
            if item is None:  # 结束信号
                break
            print(f"消费者处理: {item}")
            time.sleep(0.5)
    
    # 创建进程
    producer_process = multiprocessing.Process(target=producer, args=(queue,))
    consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
    
    producer_process.start()
    consumer_process.start()
    
    producer_process.join()
    consumer_process.join()

# 2. 管道(Pipe)
def pipe_demo():
    """管道通信演示"""
    parent_conn, child_conn = multiprocessing.Pipe()
    
    def sender(conn):
        """发送者进程"""
        for i in range(5):
            message = f"消息{i}"
            conn.send(message)
            print(f"发送者发送: {message}")
            time.sleep(1)
        conn.close()
    
    def receiver(conn):
        """接收者进程"""
        while True:
            try:
                message = conn.recv()
                print(f"接收者收到: {message}")
            except EOFError:
                break
        conn.close()
    
    # 创建进程
    sender_process = multiprocessing.Process(target=sender, args=(parent_conn,))
    receiver_process = multiprocessing.Process(target=receiver, args=(child_conn,))
    
    sender_process.start()
    receiver_process.start()
    
    sender_process.join()
    receiver_process.join()

# 3. 共享内存
def shared_memory_demo():
    """共享内存演示"""
    # 共享值
    shared_value = multiprocessing.Value('i', 0)
    
    # 共享数组
    shared_array = multiprocessing.Array('i', [0, 0, 0, 0, 0])
    
    def incrementer(value, array, process_id):
        """递增进程"""
        for i in range(3):
            with value.get_lock():
                value.value += 1
                print(f"进程 {process_id} 递增值到: {value.value}")
            
            array[process_id] += 1
            print(f"进程 {process_id} 更新数组位置 {process_id} 到: {array[process_id]}")
            time.sleep(1)
    
    # 创建多个进程
    processes = []
    for i in range(3):
        process = multiprocessing.Process(
            target=incrementer, 
            args=(shared_value, shared_array, i)
        )
        processes.append(process)
        process.start()
    
    # 等待所有进程完成
    for process in processes:
        process.join()
    
    print(f"最终共享值: {shared_value.value}")
    print(f"最终共享数组: {list(shared_array)}")

# 运行进程间通信示例
print("=== 队列通信演示 ===")
queue_demo()
print("
=== 管道通信演示 ===")
pipe_demo()
print("
=== 共享内存演示 ===")
shared_memory_demo()

3. 进程池

import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

def process_pool_basic():
    """进程池基本使用"""
    
    def cpu_intensive_task(n):
        """CPU密集型任务"""
        result = 0
        for i in range(n):
            result += i * i
        return result
    
    # 创建进程池
    with ProcessPoolExecutor(max_workers=4) as executor:
        # 提交任务
        futures = []
        for i in range(10):
            future = executor.submit(cpu_intensive_task, 1000000)
            futures.append(future)
        
        # 获取结果
        results = []
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
        
        print(f"完成 {len(results)} 个任务")

def process_pool_map():
    """使用map方法"""
    
    def square(x):
        """计算平方"""
        return x * x
    
    numbers = list(range(1000))
    
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(square, numbers))
        print(f"计算了 {len(results)} 个数的平方")

# 运行进程池示例
print("=== 进程池基本使用 ===")
process_pool_basic()
print("
=== 使用map方法 ===")
process_pool_map()

最佳实践

1. 选择正确的并发方式

import threading
import multiprocessing
import asyncio
import time

def io_bound_task():
    """I/O密集型任务"""
    time.sleep(1)  # 模拟I/O操作
    return "I/O任务完成"

def cpu_bound_task(n):
    """CPU密集型任务"""
    result = 0
    for i in range(n):
        result += i * i
    return result

def benchmark_concurrency():
    """并发方式性能对比"""
    
    # 1. 多线程(适合I/O密集型)
    def thread_approach():
        start_time = time.time()
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(io_bound_task) for _ in range(10)]
            results = [future.result() for future in futures]
        end_time = time.time()
        print(f"多线程耗时: {end_time - start_time:.2f}秒")
        return results
    
    # 2. 多进程(适合CPU密集型)
    def process_approach():
        start_time = time.time()
        with ProcessPoolExecutor(max_workers=4) as executor:
            futures = [executor.submit(cpu_bound_task, 100000) for _ in range(4)]
            results = [future.result() for future in futures]
        end_time = time.time()
        print(f"多进程耗时: {end_time - start_time:.2f}秒")
        return results
    
    # 运行测试
    print("=== I/O密集型任务(多线程) ===")
    thread_approach()
    
    print("
=== CPU密集型任务(多进程) ===")
    process_approach()

# 运行性能测试
benchmark_concurrency()

2. 避免常见陷阱

import threading
import multiprocessing

# 1. 避免全局变量竞争
def bad_global_variable():
    """错误的全局变量使用"""
    counter = 0
    
    def increment():
        nonlocal counter
        for _ in range(1000):
            counter += 1  # 竞争条件!
    
    threads = [threading.Thread(target=increment) for _ in range(5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    
    print(f"错误结果: {counter}")

def good_global_variable():
    """正确的全局变量使用"""
    counter = 0
    lock = threading.Lock()
    
    def increment():
        nonlocal counter
        for _ in range(1000):
            with lock:
                counter += 1
    
    threads = [threading.Thread(target=increment) for _ in range(5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    
    print(f"正确结果: {counter}")

# 2. 避免死锁
def deadlock_example():
    """死锁示例"""
    lock1 = threading.Lock()
    lock2 = threading.Lock()
    
    def worker1():
        lock1.acquire()
        time.sleep(0.1)  # 模拟工作
        lock2.acquire()  # 等待lock2
        # 工作...
        lock2.release()
        lock1.release()
    
    def worker2():
        lock2.acquire()
        time.sleep(0.1)  # 模拟工作
        lock1.acquire()  # 等待lock1
        # 工作...
        lock1.release()
        lock2.release()
    
    # 这可能导致死锁
    thread1 = threading.Thread(target=worker1)
    thread2 = threading.Thread(target=worker2)
    
    thread1.start()
    thread2.start()
    
    thread1.join(timeout=5)  # 设置超时
    thread2.join(timeout=5)

# 运行示例
print("=== 全局变量竞争 ===")
bad_global_variable()
good_global_variable()

今日总结

今天我们学习了多线程和多进程编程的核心知识:

1. 多线程编程:线程创建、同步机制、线程池

2. 多进程编程:进程创建、进程间通信、进程池

3. 并发编程选择:I/O密集型用多线程,CPU密集型用多进程

4. 最佳实践:避免竞争条件、死锁等常见陷阱

并发编程是现代软件开发的重大技能,掌握这些知识可以显著提高程序的性能和响应性。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容