Python 的异步 I/O 功能不仅限于 `async def` 和 `await` 构造。它们还包括其他高级工具,使异步编程更具表现力,并与常规 Python 构造保持一致。
在接下来的部分中,你将探索强大的异步功能,包括异步循环和推导式、`async with` 语句以及异常组。这些功能将帮助你编写更清晰、更易读的异步代码。
异步迭代器、循环和推导式
除了使用 `async` 和 `await` 创建协程外,Python 还提供了 `async for` 构造来遍历异步迭代器。异步迭代器允许你遍历异步生成的数据。在循环运行时,它会将控制权交还给事件循环,以便其他异步任务可以运行。
注意:如需了解有关异步迭代器的更多信息,请参阅 Python 中的异步迭代器和可迭代对象教程。
这一概念的自然扩展是异步生成器。以下是一个生成 2 的幂并在循环和推导式中使用它们的示例:
async def power_of_two(stop=10):
i = 0
while i < stop:
yield 2*i
i += 1
await asyncio.sleep(5)
async def main():
g = []
async for i in power_of_two(5):
g.append(i)
print(g)
asyncio.run(main())
async for 本身还是顺序执行,但是当前协程会让出控制权。
同步和异步生成器、循环和推导式之间有一个重要的区别。它们的异步对应物并不天生使迭代并发。相反,它们允许事件循环在你使用 `await` 明确让出控制权时,在迭代之间运行其他任务。除非你使用 `asyncio.gather()` 引入并发,否则迭代本身仍然是顺序的。
只有在处理异步迭代器或上下文管理器时,才需要使用 `async for` 和 `async with`,否则普通的 `for` 或 `with` 会引发错误。
异步
with 语句
with
`with` 语句也有一个异步版本,即 `async with`。这种构造在异步代码中非常常见,因为许多 I/O 密集型任务都涉及设置和清理阶段。
async with 专门用来异步地进入和退出上下文(即 __aenter__ / __aexit__ 协程方法)。
最常见场景:限并发、连接池、自动加解锁、数据库事务等。
以下是一个实现所需功能的示例:
import asyncio
class AsyncLimiter:
def __init__(self,max_parallel: int):
self.sem = asyncio.Semaphore(max_parallel)
async def __aenter__(self):
print("__aenter__")
await self.sem.acquire()
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("__aexit__")
self.sem.release()
async def main():
limiter = AsyncLimiter(2)
async with limiter:
print("do .....")
await asyncio.sleep(2)
print("done ......")
asyncio.run(main())
异步数据库事务(aiomysql)
import aiomysql, asyncio
async def main():
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='123456',
db='test', minsize=1, maxsize=5)
# 异步事务:进入时 BEGIN,退出时 COMMIT/ROLLBACK
async with pool.acquire() as conn:
async with conn.cursor() as cur:
async with conn.begin(): # ← async with 事务
await cur.execute("INSERT INTO t(name) VALUES (%s)", ("async",))
# 出 with 块自动 commit;异常则 rollback
pool.close(); await pool.wait_closed()
asyncio.run(main())
其他
asyncio 工具
asyncio
除了 `asyncio.run()`,你还使用了一些其他包级别的函数,例如 `asyncio.gather()` 和 `asyncio.get_event_loop()`。你可以使用 `asyncio.create_task()` 来安排协程对象的执行,然后像往常一样调用 `asyncio.run()` 函数:
asyncio.run(coro, *, debug=None)
负责启动程序,一次性启动整个 async 程序,创建新事件循环 → 运行 coro → 关闭循环 → 返回结果,永远只应出现一次,且必须在同步代码里调用。
asyncio.gather(*aws, return_exceptions=False)
负责并发等待多个协程,让多个协程/任务并发执行,并等它们全部完成,把可等待对象列表包装成单个 future;返回结果按传入顺序对齐,results = await asyncio.gather(task1(), task2())。如果你忘记 → 只是构造了一个 future,不会真正运行。
await
asyncio.create_task(coro, *, name=None)
负责把协程包装成任务丢进事件循环,在已经运行的循环里把协程立即调度成 Task,实现“后台”并发,返回 对象,可取消、可加回调、可单独
Taskt = asyncio.create_task(coro()); await t。必须在运行中的事件循环线程里调用。
await,
asyncio.get_event_loop()
需要底层操作事件循环,返回当前线程的正在运行的事件循环;没有则新建一个并设为当前,尽量别碰底层 loop。
import asyncio
async def coro():
print("coro")
await asyncio.sleep(5)
return "hello"
async def main():
task = asyncio.create_task(coro())
print(f"task is {type(task)}")
result = await task
return result
result = asyncio.run(main())
task is <class '_asyncio.Task'>
只有调用await task ,task才会真正执行。如果单纯 调用 coro() 返回仅是一个协程对象<coroutine object coro at 0x000001F83C0C1D80> ,都是需要await 才会真正执行。
这种模式包含了一个需要你注意的细微细节:如果你使用`create_task()`创建任务,但没有等待它们或将其包装在`gather()`中,而你的`main()`协程完成了,那么这些手动创建的任务将在事件循环结束时被取消。你必须等待所有你希望完成的任务。
`create_task()`函数将一个可等待对象包装成一个更高级别的`Task`对象,该对象会在后台的事件循环中并发运行。相比之下,等待一个协程会立即运行它,暂停调用者的执行,直到等待的协程完成。
`gather()` 函数的目的是将一组协程整齐地放入一个单一的未来对象中。这个对象代表一个最初未知但最终会可用的结果,通常是异步计算的结果。
如果你等待 `gather()` 并指定多个任务或协程,那么循环会等待所有任务完成。`gather()` 的结果将是一个包含输入结果的列表。
>>> import time
>>> async def main():
... task1 = asyncio.create_task(coro([10, 5, 2]))
... task2 = asyncio.create_task(coro([3, 2, 1]))
... print("Start:", time.strftime("%X"))
... result = await asyncio.gather(task1, task2)
... print("End:", time.strftime("%X"))
... print(f"Both tasks done: {all((task1.done(), task2.done()))}")
... return result
...
>>> result = asyncio.run(main())
Start: 14:38:49
End: 14:38:51
Both tasks done: True
>>> print(f"result: {result}")
result: [[2, 5, 10], [1, 2, 3]]
你可能已经注意到,`gather()` 会等待你传递给它的整个协程集合的完整结果。`gather()` 返回的结果顺序是确定性的,对应于最初传递给它的可等待对象的顺序。
作为替代,你可以通过循环遍历 `asyncio.as_completed()` 来获取完成的任务。该函数返回一个同步迭代器,按任务完成的顺序生成任务。在下面的例子中,`coro([3, 2, 1])` 的结果将在 `coro([10, 5, 2])` 完成之前可用,这与 `gather()` 函数的情况不同。
>>> async def main():
... task1 = asyncio.create_task(coro([10, 5, 2]))
... task2 = asyncio.create_task(coro([3, 2, 1]))
... print("Start:", time.strftime("%X"))
... for task in asyncio.as_completed([task1, task2]):
... result = await task
... print(f'result: {result} completed at {time.strftime("%X")}')
... print("End:", time.strftime("%X"))
... print(f"Both tasks done: {all((task1.done(), task2.done()))}")
...
>>> asyncio.run(main())
Start: 14:36:36
result: [1, 2, 3] completed at 14:36:37
result: [2, 5, 10] completed at 14:36:38
End: 14:36:38
Both tasks done: True
在这个例子中, 函数使用了
main(),它按任务完成的顺序生成任务,而不是按任务启动的顺序。当程序遍历任务时,它等待这些任务,使得任务完成时结果可以立即可用。
asyncio.as_completed()
因此,更快的任务()先完成,其结果先打印出来,而耗时更长的任务(
task1)完成后才打印结果。
task2 函数在你需要动态处理完成的任务时非常有用,这可以提高并发工作流中的响应性。
as_completed()
异步异常处理
从 Python 3.11 开始,你可以使用 `ExceptionGroup` 类来处理可能同时发生的多个不相关的异常。这在运行多个可能会引发不同异常的协程时特别有用。此外,新的 `except*` 语法可以帮助你优雅地一次性处理多个错误。
以下是如何在异步代码中使用这个类的快速演示:
>>> import asyncio
>>> async def coro_a():
... await asyncio.sleep(1)
... raise ValueError("Error in coro A")
...
>>> async def coro_b():
... await asyncio.sleep(2)
... raise TypeError("Error in coro B")
...
>>> async def coro_c():
... await asyncio.sleep(0.5)
... raise IndexError("Error in coro C")
...
>>> async def main():
... results = await asyncio.gather(
... coro_a(),
... coro_b(),
... coro_c(),
... return_exceptions=True
... )
... exceptions = [e for e in results if isinstance(e, Exception)]
... if exceptions:
... raise ExceptionGroup("Errors", exceptions)
...
在这个例子中,你有三个协程,它们分别抛出三种不同类型的异常。在 `main()` 函数中,你将这些协程作为参数传递给 `gather()`,并将 `return_exceptions` 参数设置为 `True`,以便在异常发生时捕获它们。
接下来,你使用列表推导式将异常存储在一个新列表中。如果列表中至少有一个异常,那么你可以为它们创建一个 `ExceptionGroup`。
要处理这个异常组,你可以使用以下代码:
>>> try:
... asyncio.run(main())
... except* ValueError as ve_group:
... print(f"[ValueError handled] {ve_group.exceptions}")
... except* TypeError as te_group:
... print(f"[TypeError handled] {te_group.exceptions}")
... except* IndexError as ie_group:
... print(f"[IndexError handled] {ie_group.exceptions}")
...
[ValueError handled] (ValueError('Error in coro A'),)
[TypeError handled] (TypeError('Error in coro B'),)
[IndexError handled] (IndexError('Error in coro C'),)
在这段代码中,你将对 `asyncio.run()` 的调用包装在一个 `try` 块中。然后,你使用 `except*` 语法分别捕获预期的异常。在每种情况下,你都会在屏幕上打印一条错误消息。



















暂无评论内容