今日目标
• 理解并发编程的基本概念
• 掌握Python多线程编程
• 学会多进程编程
• 了解线程同步和进程间通信
• 掌握并发编程的最佳实践
并发编程概述
并发编程允许程序同时执行多个任务,提高程序的性能和响应性:
• 多线程:共享内存空间,适合I/O密集型任务
• 多进程:独立内存空间,适合CPU密集型任务
• 异步编程:单线程非阻塞,适合高并发场景
并发 vs 并行
# 并发:多个任务交替执行
# 并行:多个任务同时执行(需要多核CPU)
import time
import threading
import multiprocessing
def cpu_bound_task(n):
"""CPU密集型任务"""
result = 0
for i in range(n):
result += i * i
return result
def io_bound_task(n):
"""I/O密集型任务"""
time.sleep(n) # 模拟I/O操作
return f"任务{n}完成"
多线程编程
1. 创建线程
import threading
import time
def worker_function(name, delay):
"""工作函数"""
print(f"线程 {name} 开始工作")
for i in range(3):
time.sleep(delay)
print(f"线程 {name} 执行第 {i+1} 次任务")
print(f"线程 {name} 工作完成")
# 方法1:使用threading.Thread
def create_thread_method1():
"""创建线程的方法1"""
# 创建线程对象
thread1 = threading.Thread(target=worker_function, args=("A", 1))
thread2 = threading.Thread(target=worker_function, args=("B", 2))
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print("所有线程执行完成")
# 方法2:继承Thread类
class WorkerThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
"""线程执行的方法"""
print(f"线程 {self.name} 开始工作")
for i in range(3):
time.sleep(self.delay)
print(f"线程 {self.name} 执行第 {i+1} 次任务")
print(f"线程 {self.name} 工作完成")
def create_thread_method2():
"""创建线程的方法2"""
# 创建线程对象
thread1 = WorkerThread("A", 1)
thread2 = WorkerThread("B", 2)
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print("所有线程执行完成")
# 运行示例
print("=== 方法1:使用threading.Thread ===")
create_thread_method1()
print("
=== 方法2:继承Thread类 ===")
create_thread_method2()
2. 线程状态和属性
import threading
import time
import os
def thread_info_demo():
"""线程信息和状态演示"""
def worker():
print(f"线程 {threading.current_thread().name} 开始")
print(f"线程ID: {threading.current_thread().ident}")
print(f"是否为主线程: {threading.current_thread() is threading.main_thread()}")
time.sleep(2)
print(f"线程 {threading.current_thread().name} 结束")
# 创建线程
thread = threading.Thread(target=worker, name="工作线程")
print(f"线程创建前状态: {thread.is_alive()}")
print(f"线程名称: {thread.name}")
print(f"是否为守护线程: {thread.daemon}")
# 启动线程
thread.start()
print(f"线程启动后状态: {thread.is_alive()}")
# 等待线程完成
thread.join()
print(f"线程结束后状态: {thread.is_alive()}")
# 守护线程示例
def daemon_thread_demo():
"""守护线程演示"""
def daemon_worker():
print("守护线程开始")
for i in range(5):
time.sleep(1)
print(f"守护线程执行第 {i+1} 次")
print("守护线程结束")
def normal_worker():
print("普通线程开始")
time.sleep(3)
print("普通线程结束")
# 创建守护线程
daemon_thread = threading.Thread(target=daemon_worker, daemon=True)
normal_thread = threading.Thread(target=normal_worker)
# 启动线程
daemon_thread.start()
normal_thread.start()
# 等待普通线程完成
normal_thread.join()
print("主程序结束,守护线程会被自动终止")
# 运行示例
print("=== 线程信息演示 ===")
thread_info_demo()
print("
=== 守护线程演示 ===")
daemon_thread_demo()
3. 线程同步
import threading
import time
import random
# 1. 锁(Lock)
def lock_demo():
"""锁的使用演示"""
counter = 0
lock = threading.Lock()
def increment():
nonlocal counter
for _ in range(1000):
with lock: # 使用上下文管理器
counter += 1
# 创建多个线程
threads = []
for i in range(5):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print(f"最终计数器值: {counter}")
# 2. 可重入锁(RLock)
def rlock_demo():
"""可重入锁演示"""
rlock = threading.RLock()
def recursive_function(level):
with rlock:
print(f"进入递归函数,层级: {level}")
if level > 0:
recursive_function(level - 1)
print(f"退出递归函数,层级: {level}")
# 创建线程
thread = threading.Thread(target=recursive_function, args=(3,))
thread.start()
thread.join()
# 3. 条件变量(Condition)
def condition_demo():
"""条件变量演示"""
shared_data = []
condition = threading.Condition()
def producer():
"""生产者"""
for i in range(5):
with condition:
shared_data.append(f"数据{i}")
print(f"生产者添加: 数据{i}")
condition.notify() # 通知消费者
time.sleep(1)
def consumer():
"""消费者"""
for i in range(5):
with condition:
while not shared_data: # 等待数据
condition.wait()
data = shared_data.pop(0)
print(f"消费者处理: {data}")
# 创建线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
# 4. 信号量(Semaphore)
def semaphore_demo():
"""信号量演示"""
semaphore = threading.Semaphore(3) # 最多允许3个线程同时访问
def worker(worker_id):
with semaphore:
print(f"工作者 {worker_id} 获得资源")
time.sleep(random.uniform(1, 3))
print(f"工作者 {worker_id} 释放资源")
# 创建多个线程
threads = []
for i in range(10):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 5. 事件(Event)
def event_demo():
"""事件演示"""
event = threading.Event()
def waiter():
"""等待事件的线程"""
print("等待者开始等待事件")
event.wait() # 等待事件
print("等待者收到事件,开始工作")
def setter():
"""设置事件的线程"""
time.sleep(3)
print("设置者设置事件")
event.set() # 设置事件
# 创建线程
waiter_thread = threading.Thread(target=waiter)
setter_thread = threading.Thread(target=setter)
waiter_thread.start()
setter_thread.start()
waiter_thread.join()
setter_thread.join()
# 运行同步示例
print("=== 锁演示 ===")
lock_demo()
print("
=== 可重入锁演示 ===")
rlock_demo()
print("
=== 条件变量演示 ===")
condition_demo()
print("
=== 信号量演示 ===")
semaphore_demo()
print("
=== 事件演示 ===")
event_demo()
4. 线程池
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import queue
def thread_pool_basic():
"""线程池基本使用"""
def task(name, delay):
time.sleep(delay)
return f"任务 {name} 完成"
# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
future1 = executor.submit(task, "A", 2)
future2 = executor.submit(task, "B", 1)
future3 = executor.submit(task, "C", 3)
# 获取结果
print(future1.result())
print(future2.result())
print(future3.result())
def thread_pool_map():
"""使用map方法"""
def square(x):
time.sleep(0.1)
return x * x
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, numbers))
print(f"结果: {results}")
def thread_pool_as_completed():
"""使用as_completed获取完成的任务"""
def task(name, delay):
time.sleep(delay)
return f"任务 {name} 完成"
tasks = [
("A", 3),
("B", 1),
("C", 2),
("D", 4)
]
with ThreadPoolExecutor(max_workers=2) as executor:
# 提交所有任务
future_to_task = {
executor.submit(task, name, delay): name
for name, delay in tasks
}
# 按完成顺序获取结果
for future in as_completed(future_to_task):
task_name = future_to_task[future]
try:
result = future.result()
print(f"{result} (任务 {task_name})")
except Exception as e:
print(f"任务 {task_name} 执行失败: {e}")
# 运行线程池示例
print("=== 线程池基本使用 ===")
thread_pool_basic()
print("
=== 使用map方法 ===")
thread_pool_map()
print("
=== 使用as_completed ===")
thread_pool_as_completed()
多进程编程
1. 创建进程
import multiprocessing
import time
import os
def process_worker(name, delay):
"""进程工作函数"""
print(f"进程 {name} 开始工作 (PID: {os.getpid()})")
for i in range(3):
time.sleep(delay)
print(f"进程 {name} 执行第 {i+1} 次任务")
print(f"进程 {name} 工作完成")
def create_process_basic():
"""基本进程创建"""
# 创建进程
process1 = multiprocessing.Process(target=process_worker, args=("A", 1))
process2 = multiprocessing.Process(target=process_worker, args=("B", 2))
# 启动进程
process1.start()
process2.start()
# 等待进程完成
process1.join()
process2.join()
print("所有进程执行完成")
class WorkerProcess(multiprocessing.Process):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
"""进程执行的方法"""
print(f"进程 {self.name} 开始工作 (PID: {os.getpid()})")
for i in range(3):
time.sleep(self.delay)
print(f"进程 {self.name} 执行第 {i+1} 次任务")
print(f"进程 {self.name} 工作完成")
def create_process_class():
"""使用类创建进程"""
# 创建进程
process1 = WorkerProcess("A", 1)
process2 = WorkerProcess("B", 2)
# 启动进程
process1.start()
process2.start()
# 等待进程完成
process1.join()
process2.join()
print("所有进程执行完成")
# 运行示例
print("=== 基本进程创建 ===")
create_process_basic()
print("
=== 使用类创建进程 ===")
create_process_class()
2. 进程间通信
import multiprocessing
import time
import random
# 1. 队列(Queue)
def queue_demo():
"""队列通信演示"""
queue = multiprocessing.Queue()
def producer(queue):
"""生产者进程"""
for i in range(5):
item = f"数据{i}"
queue.put(item)
print(f"生产者放入: {item}")
time.sleep(1)
queue.put(None) # 结束信号
def consumer(queue):
"""消费者进程"""
while True:
item = queue.get()
if item is None: # 结束信号
break
print(f"消费者处理: {item}")
time.sleep(0.5)
# 创建进程
producer_process = multiprocessing.Process(target=producer, args=(queue,))
consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
producer_process.start()
consumer_process.start()
producer_process.join()
consumer_process.join()
# 2. 管道(Pipe)
def pipe_demo():
"""管道通信演示"""
parent_conn, child_conn = multiprocessing.Pipe()
def sender(conn):
"""发送者进程"""
for i in range(5):
message = f"消息{i}"
conn.send(message)
print(f"发送者发送: {message}")
time.sleep(1)
conn.close()
def receiver(conn):
"""接收者进程"""
while True:
try:
message = conn.recv()
print(f"接收者收到: {message}")
except EOFError:
break
conn.close()
# 创建进程
sender_process = multiprocessing.Process(target=sender, args=(parent_conn,))
receiver_process = multiprocessing.Process(target=receiver, args=(child_conn,))
sender_process.start()
receiver_process.start()
sender_process.join()
receiver_process.join()
# 3. 共享内存
def shared_memory_demo():
"""共享内存演示"""
# 共享值
shared_value = multiprocessing.Value('i', 0)
# 共享数组
shared_array = multiprocessing.Array('i', [0, 0, 0, 0, 0])
def incrementer(value, array, process_id):
"""递增进程"""
for i in range(3):
with value.get_lock():
value.value += 1
print(f"进程 {process_id} 递增值到: {value.value}")
array[process_id] += 1
print(f"进程 {process_id} 更新数组位置 {process_id} 到: {array[process_id]}")
time.sleep(1)
# 创建多个进程
processes = []
for i in range(3):
process = multiprocessing.Process(
target=incrementer,
args=(shared_value, shared_array, i)
)
processes.append(process)
process.start()
# 等待所有进程完成
for process in processes:
process.join()
print(f"最终共享值: {shared_value.value}")
print(f"最终共享数组: {list(shared_array)}")
# 运行进程间通信示例
print("=== 队列通信演示 ===")
queue_demo()
print("
=== 管道通信演示 ===")
pipe_demo()
print("
=== 共享内存演示 ===")
shared_memory_demo()
3. 进程池
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
def process_pool_basic():
"""进程池基本使用"""
def cpu_intensive_task(n):
"""CPU密集型任务"""
result = 0
for i in range(n):
result += i * i
return result
# 创建进程池
with ProcessPoolExecutor(max_workers=4) as executor:
# 提交任务
futures = []
for i in range(10):
future = executor.submit(cpu_intensive_task, 1000000)
futures.append(future)
# 获取结果
results = []
for future in as_completed(futures):
result = future.result()
results.append(result)
print(f"完成 {len(results)} 个任务")
def process_pool_map():
"""使用map方法"""
def square(x):
"""计算平方"""
return x * x
numbers = list(range(1000))
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, numbers))
print(f"计算了 {len(results)} 个数的平方")
# 运行进程池示例
print("=== 进程池基本使用 ===")
process_pool_basic()
print("
=== 使用map方法 ===")
process_pool_map()
最佳实践
1. 选择正确的并发方式
import threading
import multiprocessing
import asyncio
import time
def io_bound_task():
"""I/O密集型任务"""
time.sleep(1) # 模拟I/O操作
return "I/O任务完成"
def cpu_bound_task(n):
"""CPU密集型任务"""
result = 0
for i in range(n):
result += i * i
return result
def benchmark_concurrency():
"""并发方式性能对比"""
# 1. 多线程(适合I/O密集型)
def thread_approach():
start_time = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(io_bound_task) for _ in range(10)]
results = [future.result() for future in futures]
end_time = time.time()
print(f"多线程耗时: {end_time - start_time:.2f}秒")
return results
# 2. 多进程(适合CPU密集型)
def process_approach():
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_bound_task, 100000) for _ in range(4)]
results = [future.result() for future in futures]
end_time = time.time()
print(f"多进程耗时: {end_time - start_time:.2f}秒")
return results
# 运行测试
print("=== I/O密集型任务(多线程) ===")
thread_approach()
print("
=== CPU密集型任务(多进程) ===")
process_approach()
# 运行性能测试
benchmark_concurrency()
2. 避免常见陷阱
import threading
import multiprocessing
# 1. 避免全局变量竞争
def bad_global_variable():
"""错误的全局变量使用"""
counter = 0
def increment():
nonlocal counter
for _ in range(1000):
counter += 1 # 竞争条件!
threads = [threading.Thread(target=increment) for _ in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"错误结果: {counter}")
def good_global_variable():
"""正确的全局变量使用"""
counter = 0
lock = threading.Lock()
def increment():
nonlocal counter
for _ in range(1000):
with lock:
counter += 1
threads = [threading.Thread(target=increment) for _ in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"正确结果: {counter}")
# 2. 避免死锁
def deadlock_example():
"""死锁示例"""
lock1 = threading.Lock()
lock2 = threading.Lock()
def worker1():
lock1.acquire()
time.sleep(0.1) # 模拟工作
lock2.acquire() # 等待lock2
# 工作...
lock2.release()
lock1.release()
def worker2():
lock2.acquire()
time.sleep(0.1) # 模拟工作
lock1.acquire() # 等待lock1
# 工作...
lock1.release()
lock2.release()
# 这可能导致死锁
thread1 = threading.Thread(target=worker1)
thread2 = threading.Thread(target=worker2)
thread1.start()
thread2.start()
thread1.join(timeout=5) # 设置超时
thread2.join(timeout=5)
# 运行示例
print("=== 全局变量竞争 ===")
bad_global_variable()
good_global_variable()
今日总结
今天我们学习了多线程和多进程编程的核心知识:
1. 多线程编程:线程创建、同步机制、线程池
2. 多进程编程:进程创建、进程间通信、进程池
3. 并发编程选择:I/O密集型用多线程,CPU密集型用多进程
4. 最佳实践:避免竞争条件、死锁等常见陷阱
并发编程是现代软件开发的重大技能,掌握这些知识可以显著提高程序的性能和响应性。
© 版权声明
文章版权归作者所有,未经允许请勿转载。如内容涉嫌侵权,请在本页底部进入<联系我们>进行举报投诉!
THE END

















暂无评论内容