异步IO的其他特性

Python 的异步 I/O 功能不仅限于 `async def` 和 `await` 构造。它们还包括其他高级工具,使异步编程更具表现力,并与常规 Python 构造保持一致。

在接下来的部分中,你将探索强大的异步功能,包括异步循环和推导式、`async with` 语句以及异常组。这些功能将帮助你编写更清晰、更易读的异步代码。

异步迭代器、循环和推导式

除了使用 `async` 和 `await` 创建协程外,Python 还提供了 `async for` 构造来遍历异步迭代器。异步迭代器允许你遍历异步生成的数据。在循环运行时,它会将控制权交还给事件循环,以便其他异步任务可以运行。

注意:如需了解有关异步迭代器的更多信息,请参阅 Python 中的异步迭代器和可迭代对象教程。

这一概念的自然扩展是异步生成器。以下是一个生成 2 的幂并在循环和推导式中使用它们的示例:



async def power_of_two(stop=10):
    i = 0
    while i < stop:
        yield 2*i
        i += 1
        await asyncio.sleep(5)
 
async def main():
    g = []
    async for i in power_of_two(5):
        g.append(i)
    print(g)
 
 
asyncio.run(main())

async for 本身还是顺序执行,但是当前协程会让出控制权。

同步和异步生成器、循环和推导式之间有一个重要的区别。它们的异步对应物并不天生使迭代并发。相反,它们允许事件循环在你使用 `await` 明确让出控制权时,在迭代之间运行其他任务。除非你使用 `asyncio.gather()` 引入并发,否则迭代本身仍然是顺序的。

只有在处理异步迭代器或上下文管理器时,才需要使用 `async for` 和 `async with`,否则普通的 `for` 或 `with` 会引发错误。

异步
with
语句

`with` 语句也有一个异步版本,即 `async with`。这种构造在异步代码中非常常见,因为许多 I/O 密集型任务都涉及设置和清理阶段。

async with 专门用来异步地进入和退出上下文(即 __aenter__ / __aexit__ 协程方法)。
最常见场景:限并发、连接池、自动加解锁、数据库事务等。

以下是一个实现所需功能的示例:



import asyncio
 
 
class AsyncLimiter:
    def __init__(self,max_parallel: int):
        self.sem = asyncio.Semaphore(max_parallel)
 
    async def __aenter__(self):
        print("__aenter__")
        await self.sem.acquire()
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("__aexit__")
        self.sem.release()
 
 
 
async def main():
    limiter = AsyncLimiter(2)
    async with limiter:
        print("do .....")
        await asyncio.sleep(2)
        print("done ......")
 
asyncio.run(main())

异步数据库事务(aiomysql)



import aiomysql, asyncio
 
async def main():
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                      user='root', password='123456',
                                      db='test', minsize=1, maxsize=5)
    # 异步事务:进入时 BEGIN,退出时 COMMIT/ROLLBACK
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            async with conn.begin():          # ← async with 事务
                await cur.execute("INSERT INTO t(name) VALUES (%s)", ("async",))
                # 出 with 块自动 commit;异常则 rollback
    pool.close(); await pool.wait_closed()
 
asyncio.run(main())

其他
asyncio
 工具

除了 `asyncio.run()`,你还使用了一些其他包级别的函数,例如 `asyncio.gather()` 和 `asyncio.get_event_loop()`。你可以使用 `asyncio.create_task()` 来安排协程对象的执行,然后像往常一样调用 `asyncio.run()` 函数:

asyncio.run(coro, *, debug=None)
负责启动程序,一次性启动整个 async 程序,创建新事件循环 → 运行 coro → 关闭循环 → 返回结果,永远只应出现一次,且必须在同步代码里调用。

asyncio.gather(*aws, return_exceptions=False)
负责并发等待多个协程,让多个协程/任务并发执行,并等它们全部完成,把可等待对象列表包装成单个 future;返回结果按传入顺序对齐,results = await asyncio.gather(task1(), task2())。如果你忘记
await
→ 只是构造了一个 future,不会真正运行。

asyncio.create_task(coro, *, name=None)
负责把协程包装成任务丢进事件循环,在已经运行的循环里把协程立即调度成 Task,实现“后台”并发,返回
Task
对象,可取消、可加回调、可单独
await,
t = asyncio.create_task(coro()); await t。必须在运行中的事件循环线程里调用。

asyncio.get_event_loop()
需要底层操作事件循环,返回当前线程的正在运行的事件循环;没有则新建一个并设为当前,尽量别碰底层 loop。



import asyncio
 
 
async def coro():
    print("coro")
    await asyncio.sleep(5)
    return "hello"
 
async def main():
    task = asyncio.create_task(coro())
    print(f"task is {type(task)}")
    result = await task
    return result
 
result = asyncio.run(main())

task is <class '_asyncio.Task'>

只有调用await task ,task才会真正执行。如果单纯 调用 coro() 返回仅是一个协程对象<coroutine object coro at 0x000001F83C0C1D80>  ,都是需要await 才会真正执行。

这种模式包含了一个需要你注意的细微细节:如果你使用`create_task()`创建任务,但没有等待它们或将其包装在`gather()`中,而你的`main()`协程完成了,那么这些手动创建的任务将在事件循环结束时被取消。你必须等待所有你希望完成的任务。

`create_task()`函数将一个可等待对象包装成一个更高级别的`Task`对象,该对象会在后台的事件循环中并发运行。相比之下,等待一个协程会立即运行它,暂停调用者的执行,直到等待的协程完成。

`gather()` 函数的目的是将一组协程整齐地放入一个单一的未来对象中。这个对象代表一个最初未知但最终会可用的结果,通常是异步计算的结果。

如果你等待 `gather()` 并指定多个任务或协程,那么循环会等待所有任务完成。`gather()` 的结果将是一个包含输入结果的列表。



>>> import time
 
>>> async def main():
...     task1 = asyncio.create_task(coro([10, 5, 2]))
...     task2 = asyncio.create_task(coro([3, 2, 1]))
...     print("Start:", time.strftime("%X"))
...     result = await asyncio.gather(task1, task2)
...     print("End:", time.strftime("%X"))
...     print(f"Both tasks done: {all((task1.done(), task2.done()))}")
...     return result
...
 
>>> result = asyncio.run(main())
Start: 14:38:49
End: 14:38:51
Both tasks done: True
 
>>> print(f"result: {result}")
result: [[2, 5, 10], [1, 2, 3]]

你可能已经注意到,`gather()` 会等待你传递给它的整个协程集合的完整结果。`gather()` 返回的结果顺序是确定性的,对应于最初传递给它的可等待对象的顺序。

作为替代,你可以通过循环遍历 `asyncio.as_completed()` 来获取完成的任务。该函数返回一个同步迭代器,按任务完成的顺序生成任务。在下面的例子中,`coro([3, 2, 1])` 的结果将在 `coro([10, 5, 2])` 完成之前可用,这与 `gather()` 函数的情况不同。



>>> async def main():
...     task1 = asyncio.create_task(coro([10, 5, 2]))
...     task2 = asyncio.create_task(coro([3, 2, 1]))
...     print("Start:", time.strftime("%X"))
...     for task in asyncio.as_completed([task1, task2]):
...         result = await task
...         print(f'result: {result} completed at {time.strftime("%X")}')
...     print("End:", time.strftime("%X"))
...     print(f"Both tasks done: {all((task1.done(), task2.done()))}")
...
 
>>> asyncio.run(main())
Start: 14:36:36
result: [1, 2, 3] completed at 14:36:37
result: [2, 5, 10] completed at 14:36:38
End: 14:36:38
Both tasks done: True

在这个例子中,
main()
函数使用了
asyncio.as_completed()
,它按任务完成的顺序生成任务,而不是按任务启动的顺序。当程序遍历任务时,它等待这些任务,使得任务完成时结果可以立即可用。

因此,更快的任务(
task1
)先完成,其结果先打印出来,而耗时更长的任务(
task2
)完成后才打印结果。
as_completed()
函数在你需要动态处理完成的任务时非常有用,这可以提高并发工作流中的响应性。

异步异常处理

从 Python 3.11 开始,你可以使用 `ExceptionGroup` 类来处理可能同时发生的多个不相关的异常。这在运行多个可能会引发不同异常的协程时特别有用。此外,新的 `except*` 语法可以帮助你优雅地一次性处理多个错误。

以下是如何在异步代码中使用这个类的快速演示:



>>> import asyncio
 
>>> async def coro_a():
...     await asyncio.sleep(1)
...     raise ValueError("Error in coro A")
...
 
>>> async def coro_b():
...     await asyncio.sleep(2)
...     raise TypeError("Error in coro B")
...
 
>>> async def coro_c():
...     await asyncio.sleep(0.5)
...     raise IndexError("Error in coro C")
...
 
>>> async def main():
...     results = await asyncio.gather(
...         coro_a(),
...         coro_b(),
...         coro_c(),
...         return_exceptions=True
...     )
...     exceptions = [e for e in results if isinstance(e, Exception)]
...     if exceptions:
...         raise ExceptionGroup("Errors", exceptions)
...

在这个例子中,你有三个协程,它们分别抛出三种不同类型的异常。在 `main()` 函数中,你将这些协程作为参数传递给 `gather()`,并将 `return_exceptions` 参数设置为 `True`,以便在异常发生时捕获它们。

接下来,你使用列表推导式将异常存储在一个新列表中。如果列表中至少有一个异常,那么你可以为它们创建一个 `ExceptionGroup`。

要处理这个异常组,你可以使用以下代码:



>>> try:
...     asyncio.run(main())
... except* ValueError as ve_group:
...     print(f"[ValueError handled] {ve_group.exceptions}")
... except* TypeError as te_group:
...     print(f"[TypeError handled] {te_group.exceptions}")
... except* IndexError as ie_group:
...     print(f"[IndexError handled] {ie_group.exceptions}")
...
[ValueError handled] (ValueError('Error in coro A'),)
[TypeError handled] (TypeError('Error in coro B'),)
[IndexError handled] (IndexError('Error in coro C'),)

在这段代码中,你将对 `asyncio.run()` 的调用包装在一个 `try` 块中。然后,你使用 `except*` 语法分别捕获预期的异常。在每种情况下,你都会在屏幕上打印一条错误消息。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容