点赞标记,明天就能用上这几个技巧!
asyncio 用法大全(中文全攻略)
下面把 asyncio 的核心概念、常用 API、实战技巧以及常见坑点全部梳理一遍,协助你快速上手并在项目中安全、稳健地使用异步编程。
1️⃣ 什么是 asyncio?
- asyncio 是 Python 标准库提供的 异步 I/O 框架,基于 事件循环(event loop)。
- 通过 协程(coroutine)、任务(Task)、Future 等对象,实现 单线程并发,适合网络请求、文件 I/O、定时任务等 I/O‑bound 场景。
- 与多线程/多进程相比,切换开销极低,但 不适合 CPU 密集型计算(此时仍提议使用 concurrent.futures)。
2️⃣ 基础概念与核心 API
|
概念 |
说明 |
常用函数/类 |
|
协程函数 (async def) |
可被 await 的函数,返回 协程对象(coroutine) |
async def foo(): … |
|
事件循环 |
管理所有 I/O、定时器、任务调度的核心对象 |
asyncio.get_event_loop(), asyncio.run() |
|
Task |
把协程包装成可调度的对象,立即加入循环 |
asyncio.create_task(coro) |
|
Future |
表明“将来某时会产生结果”的占位对象,Task 是 Future 的子类 |
loop.create_future() |
|
await |
暂停当前协程,等待 awaitable(Task、Future、asyncio.sleep 等)完成 |
await asyncio.sleep(1) |
|
async with / async for |
支持异步上下文管理器和异步迭代器 |
async with aiohttp.ClientSession() as s: |
|
同步转异步 |
在协程里调用阻塞函数时,用线程池或进程池 |
await loop.run_in_executor(None, blocking_func, *args) |
3️⃣ 快速上手示例
3.1 最简入口:asyncio.run
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)# 异步等待 1 秒
print("World")
asyncio.run(hello())
- asyncio.run() 会 创建事件循环 → 运行协程 → 关闭循环,是推荐的入口方式(Python 3.7+)。
3.2 并发执行多个协程
import asyncio
async def worker(i):
await asyncio.sleep(i)
return f"Task {i} 完成"
async def main():
tasks = [asyncio.create_task(worker(i)) for i in range(5)]
# gather 会等待所有任务完成并返回结果列表
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
- asyncio.create_task 把协程立刻加入循环,gather 用于收集结果。
3.3 超时与撤销
import asyncio
async def long_job():
await asyncio.sleep(10)
return "完成"
async def main():
try:
result = await asyncio.wait_for(long_job(), timeout=3)
print(result)
except asyncio.TimeoutError:
print("任务超时,被撤销")
asyncio.run(main())
- asyncio.wait_for 包装协程,超时后自动抛 TimeoutError 并 撤销 原协程。
4️⃣ 常用高级功能
|
功能 |
关键 API |
典型使用场景 |
|
并发限制(Semaphore) |
asyncio.Semaphore |
限制并发请求数(如爬虫) |
|
生产者‑消费者 |
asyncio.Queue |
异步任务流水线、消息队列 |
|
定时任务 |
loop.call_later, asyncio.sleep |
心跳、定时刷新 |
|
信号处理 |
loop.add_signal_handler(Unix) |
优雅关闭服务 |
|
子进程交互 |
asyncio.create_subprocess_exec |
异步运行外部命令 |
|
异步文件 I/O |
aiofiles(第三方) |
大文件读写不阻塞 |
|
异步 HTTP |
aiohttp(第三方) |
高并发网络请求 |
|
线程池/进程池 |
loop.run_in_executor |
调用阻塞库(如 requests、pandas) |
|
上下文管理器 |
async with |
自动关闭资源(如 aiohttp.ClientSession) |
|
异步迭代 |
async for |
读取流式响应、WebSocket 消息 |
4.1 示例:使用 Semaphore 限制并发请求
import asyncio, aiohttp
sem = asyncio.Semaphore(5)# 最多 5 个并发请求
async def fetch(url):
async with sem:# 进入临界区
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
async def main(urls):
tasks = [asyncio.create_task(fetch(u)) for u in urls]
return await asyncio.gather(*tasks)
# asyncio.run(main([...]))
4.2 示例:生产者‑消费者(Queue)
import asyncio
import random
queue = asyncio.Queue(maxsize=10)
async def producer():
for i in range(20):
await asyncio.sleep(random.random())
await queue.put(f"item-{i}")
print(f"生产: item-{i}")
async def consumer():
while True:
item = await queue.get()
print(f"消费: {item}")
await asyncio.sleep(random.random())
queue.task_done()
async def main():
prod = asyncio.create_task(producer())
cons = [asyncio.create_task(consumer()) for _ in range(3)]
await prod
await queue.join()# 等待所有已入队项目被消费
for c in cons:
c.cancel()# 结束消费者协程
asyncio.run(main())
5️⃣ 与同步代码的桥接
5.1 在协程里调用阻塞函数
import asyncio, time
def blocking_io():
time.sleep(2)# 真正的阻塞
return "done"
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io)# None → 默认线程池
print(result)
asyncio.run(main())
- 注意:不要在协程内部直接调用阻塞函数,否则会阻塞整个事件循环。
5.2 从同步代码启动 asyncio
def sync_entry():
asyncio.run(async_main())# 只在入口处调用一次
若已有运行中的循环(如在 GUI 框架里),使用 asyncio.get_event_loop().create_task(coro) 或 loop.run_until_complete。
6️⃣ 与 GUI 框架的结合(Tkinter / PyQt / wxPython)
|
框架 |
关键技巧 |
|
Tkinter |
使用 root.after(0, lambda: asyncio.create_task(coro())) 把协程加入循环;或在独立线程跑 asyncio.run 并通过 queue 与 UI 交互。 |
|
PyQt / PySide |
QEventLoop 与 asyncio 可通过 asyncqt(或 qasync)桥接,实现 await UI 信号。 |
|
wxPython |
wx.CallAfter 与 |
示例(Tkinter + asyncio)
import tkinter as tk, asyncio, aiohttp
root = tk.Tk()
root.title("异步请求 Demo")
txt = tk.Text(root, height=10, width=50)
txt.pack()
async def fetch():
async with aiohttp.ClientSession() as s:
async with s.get('https://httpbin.org/get') as r:
data = await r.text()
txt.insert(tk.END, data)
def start():
asyncio.create_task(fetch())# 直接在已有循环中创建任务
root.after(0, lambda: asyncio.get_event_loop())# 初始化循环
btn = tk.Button(root, text="请求", command=start)
btn.pack()
root.mainloop()
7️⃣ 常见坑 & 调试技巧
|
症状 |
可能缘由 |
解决方案 |
|
协程卡住不执行 |
未 await、忘记 asyncio.run 或 create_task |
确认所有 async def 被 await 或包装为 Task。 |
|
RuntimeError: Event loop is closed |
在已关闭的循环中调用 run_until_complete |
使用 asyncio.run 或在 if __name__ == '__main__' 中创建循环。 |
|
CPU 占用 100% |
协程里使用了阻塞循环(while True:)未 await |
在循环体内部加入 await asyncio.sleep(0) 或适当的异步等待。 |
|
HTTPS 报错 SSL: CERTIFICATE_VERIFY_FAILED |
未信任自签名证书或代理导致证书失效 |
使用 ssl=False(仅在可信环境)或导入正确的根证书。 |
|
子进程输出不实时 |
create_subprocess_exec 默认缓冲 |
设置 stdout=asyncio.subprocess.PIPE 并使用 await proc.stdout.readline()。 |
|
调试困难 |
协程栈信息不完整 |
使用 asyncio.set_debug(True),或 python -X dev 启动解释器获取详细警告。 |
调试小技巧:
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
asyncio.get_event_loop().set_debug(True)
8️⃣ 性能与最佳实践
- 尽量使用异步库(aiohttp、aiomysql、aioredis),不要在协程里混用同步阻塞库。
- 合理使用 gather:一次性等待多个任务,避免逐个 await 导致串行执行。
- 限制并发:使用 Semaphore、Queue 或 asyncio.BoundedSemaphore 防止对目标服务器产生过大压力。
- 异常捕获:asyncio.gather(…, return_exceptions=True) 能让单个任务失败不影响整体。
- 资源释放:使用 async with 确保 ClientSession、文件句柄等在异常时也能关闭。
- 避免共享可变状态:协程共享全局变量时要加锁(asyncio.Lock),防止竞争条件。
9️⃣ 小结
- asyncio 为 I/O‑bound 场景提供高效的单线程并发模型。
- 关键是 协程 → 任务 → 事件循环 的链条,配合 await 实现非阻塞等待。
- 常用工具:asyncio.run, create_task, gather, wait_for, Semaphore, Queue, run_in_executor。
- 与同步代码、GUI 框架以及第三方异步库的结合是实战重点。
如果你已经有具体的使用场景(列如爬虫、实时聊天、文件批处理或 GUI 应用),我可以帮你写出对应的完整示例代码(如 aiohttp + Semaphore、Tkinter + asyncio 等),需要吗?
都看到这里了, 记得帮忙一键三连, 点赞、评论、收藏、关注
















- 最新
- 最热
只看作者