百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

在Python中使用Asyncio系统(3-6)优雅地开启和关闭协程

itomcoil 2025-01-24 12:17 13 浏览

优雅地开启和关闭协程

大多数基于异步的程序都是基于网络的并且长期运行的应用程序。这个领域在处理如何启动和关闭的过程中有惊人的复杂性。

在这两个操作中,启动一般比较简单。启动异步应用程序的标准方式是有一个main()协程函数,并用asyncio.run()调用它,就像本章开头的示例3-2所演示的那样。

一般来说,启动都会相当的简单直接;比如前面描述的服务器案例,你可能在文档中不止一次的阅读到它。我们待会儿要在后面的章节简要通过代码来介绍一个服务器启动的演示。

关闭程序一般更复杂一些。为了关闭,我之前提到了在asyncio.run()中的准备步骤。当async def main()函数退出时,要采取以下操作:

  1. 收集所有仍在排队的任务对象(如果有的话)。
  2. 取消这些任务(这个步骤会在每个运行的协程中抛出CancelledError异常,就是你在协程函数的代码里使用try/except来处理的那个异常)。
  3. 把所有这些任务放到一个组任务中。
  4. 在组任务中使用run_until_complete()等待所有这些任务完成,然后抛出或者处理CancelledError。

asyncio.run()为你执行上面提到的这些操作,但尽管有这些帮助,在构建最初几个重要的asyncio应用程序的过程中,仍将尝试在关闭期间清除错误消息,就像这样:“Task was destroyed but it is pending!”。发生这种情况是因为应用程序没等到执行完前面的一个或几个步骤就关闭主程序了。例3-29是一个引发这个烦人错误的示例。

示例 3-29 未决任务的销毁程序

# taskwarning.py
import asyncio


async def f(delay):
  await asyncio.sleep(delay)


loop = asyncio.get_event_loop()
t1 = loop.create_task(f(1))  
t2 = loop.create_task(f(2))  
loop.run_until_complete(t1) 
loop.close()
  • (L8)任务1将运行1秒。
  • (L9)任务2将运行2秒。
  • (L10)只能在任务1完成前才运行。

运行这段代码将会有以下输出:

$ python taskwarning.py
Task was destroyed but it is pending!
task: 

这个错误信息告诉你当循环关闭时,有些任务还没有完成。我们想要避免这种情况,这就是为什么一般的关闭过程是收集所有未完成的任务,终止这些未完成的任务,然后等它们在关闭循环之前全部完成。asyncio.run()为你完成了所有这些步骤,但重要的是要详细了解流程,这样你就能够处理更复杂的情况。

我们再看一个更详细的演示了所有这些阶段的代码示例。示例3-30是一个基于telnet的echo服务器的迷你案例研究。

示例 3-30 异步应用程序的生命周期

# telnetdemo.py
import asyncio
from asyncio import StreamReader, StreamWriter


async def echo(reader: StreamReader, writer: StreamWriter): 
    print('New connection.')
    try:
        while data := await reader.readline():  
            writer.write(data.upper())  
            await writer.drain()
        print('Leaving Connection.')
    except asyncio.CancelledError:  
        print('Connection dropped!')


async def main(host='127.0.0.1', port=8888):
    server = await asyncio.start_server(echo, host, port) 
    async with server:
        await server.serve_forever()


try:
    asyncio.run(main())
except KeyboardInterrupt:
    print('Bye!')
  • (L5) 服务器将使用echo()协程函数为每个连接创建一个协程。这个函数使用streams API与asyncio进行网络连接。
  • (L8) 为了保持连接的活性,我们要使用一个无限循环来等待消息接入。
  • (L9) 然后把数据直接原样返回给发送方,就是把字母全部变成大写。
  • (L12) 如果这个任务被终止,我们会打印一条消息。
  • (L16) 这段启动TCP服务器的代码直接取自Python 3.8的官方文档。

启动echo服务器后,你可以使用telnet连接到它并跟它交互:

$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hi!
HI!
stop shouting
STOP SHOUTING
^]
telnet> q/
Connection closed.

这次会话的服务器输出如下所示(服务器继续运行,直到我们按Ctrl-C):

$ python telnetdemo.py
New connection.
Leaving Connection.
^CBye!

在刚才展示的Telnet会话中,客户机(即Telnet)在服务器停止之前关闭了连接,现在让我们看看如果在连接处于活跃状态时关闭服务器会发生什么。我们会看到服务器进程的如下输出:

$ python telnetdemo.py
New connection.
^CConnection dropped!
Bye!

这时你可以看到CancelledError的异常处理程序被触发。现在我们假设这是一个真实的生产级应用程序,我们希望把所有关于断开连接的事件发送到监控服务。代码示例可能被修改为示例3-31。

示例 3-31 在取消步骤中创建任务

# telnetdemo.py
import asyncio
from asyncio import StreamReader, StreamWriter


async def send_event(msg: str):  
    await asyncio.sleep(1)


async def echo(reader: StreamReader, writer: StreamWriter):
    print('New connection.')
    try:
        while (data := await reader.readline()):
            writer.write(data.upper())
            await writer.drain()
        print('Leaving Connection.')
    except asyncio.CancelledError:
        msg = 'Connection dropped!'
        print(msg)
        asyncio.create_task(send_event(msg))  


async def main(host='127.0.0.1', port=8888):
    server = await asyncio.start_server(echo, host, port)
    async with server:
        await server.serve_forever()


try:
    asyncio.run(main())
except KeyboardInterrupt:
    print('Bye!')
  • (L5) 假设这个协程实际上要联系外部服务器以提交事件通知。
  • (L18) 因为事件通知器涉及网络访问,所以这样的调用通常是在单独的异步任务中进行的;这就是我们在这里使用create_task()函数的原因。

但是,这段代码有一个错误。如果我们重新运行这个示例,并确保在连接处于活跃状态时停止服务器(使用Ctrl-C),这个bug就变得很明显了:

$ python telnetdemo.py
New connection.
^CConnection dropped!
Bye!
Task was destroyed but it is pending!
task: 

要理解为什么会发生这种情况,我们必须回到asyncio.run()在关闭阶段所做的清理事件的顺序;特别要注意更重要的部分是,当我们按下Ctrl-C时,所有当前活跃的任务都会被收集和注销。这时候,只有那些当前活跃的任务会被等待直到完成,并且asyncio.run()在所有那些任务完成之后才返回。修改后的代码中的错误是,我们在现有的“echo”任务的注销处理步骤中创建了一个发送事件的新任务。只有在asyncio.run()收集并注销了流程中的所有任务之后,才会创建这个新任务。

这就是为什么了解asyncio.run()是怎么运行的很重要。

建议:一般的经验法则是,尽量避免在CancelledError异常处理程序中创建新任务。如果必须,也一定要等待同一函数范围内的新任务或未来任务。

最后:如果你正在使用一个库或框架,请确保按照它的文档来执行启动和关闭操作。第三方框架通常提供自己的启动和关闭函数,并提供自定义的事件钩子。你可以在第115页的“案例研究:缓存失效”中看到Sanic框架中使用这些钩子的例子。

gather()函数中的return_exceptions=True是什么意思?

你可能已经注意到,示例3-3和示例3-1,在关闭步骤中调用gather()时的关键字参数return_exceptions=True,但我当时故意悄悄地没有提到它。asyncio.run()也在内部使用gather()和return_exceptions=True,现在是进一步讨论这个的时候了。

不幸的是,这个参数默认是gather(…return_exceptions = False)。这个默认值对于大多数情况都是有问题的,包括关闭过程,这就是为什么asyncio.run()把参数设置为True。直接解释有点复杂;相反,让我们通过一系列详细解释来理解以上的观点会更容易:

  1. run_until_complete()函数里面操作的是一个future;在关闭任务期间,它是gather()返回的future。
  2. 如果里面的future引发异常,这个异常就会从run_until_complete()抛出,这将导致循环停止运行。
  3. 如果run_until_complete()函数操作的是一个组future,在任何子任务中引发的任何异常如果不在子任务中处理掉,就会在这个组future中引发异常。注意,这种情况也包括CancelledError。
  4. 如果只有一部分任务处理CancelledError,而其他任务不处理CancelledError,那么不处理CancelledError的任务将导致循环停止。这意味着循环在所有任务完成之前就会异常终止。
  5. 在关闭任务的过程中,我们真的不想发生这种行为。我们希望run_until_complete()只在在它组中的所有子任务都已完成时才能终止,不管其中的一些任务是否引发异常。
  6. 于是我们gather(*, return_exceptions=True):这个设置会让组future把来自于子任务的异常视为返回值,这样它们就不会冒出来干扰到run_until_complete()。

这样就知道了return_exceptions=True和run_until_complete()之间的关系。用这种方式捕获异常的一个不良后果是:有些错误可能没有引起你的注意,因为它们现在正在一组任务的内部处理。如果这是一个麻烦,你可以从run_until_complete()获取输出列表,并扫描它以查找Exception的任何子类,然后编写适合你情况的日志消息。示例3-32演示了这种方法。

示例 3-32 所有的任务都会完成

# alltaskscomplete.py
import asyncio


async def f(delay):
    await asyncio.sleep(1 / delay)  
    return delay


loop = asyncio.get_event_loop()
for i in range(10):
    loop.create_task(f(i))
pending = asyncio.all_tasks()
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f'Results: {results}')
loop.close()
  • (L5) 这里一定会触发一个异常

这是运行输出:

$ python alltaskscomplete.py
Results: [6, 9, 3, 7, ...
          ZeroDivisionError('division by zero',), 4, ...
          8, 1, 5, 2]

如果没有设置return_exceptions=True,就会从run_until_complete()引发ZeroDivisionError,停止循环,从而阻止其他任务完成。

在下一节中,我们将讨论信号处理(KeyboardInterrupt之外的信号),但在此之前,有必要记住,优雅地关闭是网络编程中比较困难的方面之一,对于asyncio来说也是这样。本节中的信息仅仅是一个开始。我鼓励你在自己的自动化测试中使用明确的关闭测试。不同的应用程序通常需要不同的策略。

建议:我在Python包索引(PyPI)上发布了一个名为aiorun的小包,主要用于我自己在处理asyncio关闭方面的实验和学习,它整合了本节中的许多想法。可能对于你在代码进行修改,并围绕asyncio关闭场景试验你自己的想法方面是有用的。

相关推荐

Excel新函数TEXTSPLIT太强大了,轻松搞定数据拆分!

我是【桃大喵学习记】,欢迎大家关注哟~,每天为你分享职场办公软件使用技巧干货!最近我把WPS软件升级到了版本号:12.1.0.15990的最新版本,最版本已经支持文本拆分函数TEXTSPLIT了,并...

Excel超强数据拆分函数TEXTSPLIT,从入门到精通!

我是【桃大喵学习记】,欢迎大家关注哟~,每天为你分享职场办公软件使用技巧干货!今天跟大家分享的是Excel超强数据拆分函数TEXTSPLIT,带你从入门到精通!TEXTSPLIT函数真是太强大了,轻松...

看完就会用的C++17特性总结(c++11常用新特性)

作者:taoklin,腾讯WXG后台开发一、简单特性1.namespace嵌套C++17使我们可以更加简洁使用命名空间:2.std::variant升级版的C语言Union在C++17之前,通...

plsql字符串分割浅谈(plsql字符集设置)

工作之中遇到的小问题,在此抛出问题,并给出解决方法。一方面是为了给自己留下深刻印象,另一方面给遇到相似问题的同学一个解决思路。如若其中有写的不好或者不对的地方也请不加不吝赐教,集思广益,共同进步。遇到...

javascript如何分割字符串(javascript切割字符串)

javascript如何分割字符串在JavaScript中,您可以使用字符串的`split()`方法来将一个字符串分割成一个数组。`split()`方法接收一个参数,这个参数指定了分割字符串的方式。如...

TextSplit函数的使用方法(入门+进阶+高级共八种用法10个公式)

在Excel和WPS新增的几十个函数中,如果按实用性+功能性排名,textsplit排第二,无函数敢排第一。因为它不仅使用简单,而且解决了以前用超复杂公式才能搞定的难题。今天小编用10个公式,让你彻底...

Python字符串split()方法使用技巧

在Python中,字符串操作可谓是基础且关键的技能,而今天咱们要重点攻克的“堡垒”——split()方法,它能将看似浑然一体的字符串,按照我们的需求进行拆分,极大地便利了数据处理与文本解析工作。基本语...

go语言中字符串常用的系统函数(golang 字符串)

最近由于工作比较忙,视频有段时间没有更新了,在这里跟大家说声抱歉了,我尽快抽些时间整理下视频今天就发一篇关于go语言的基础知识吧!我这我工作中用到的一些常用函数,汇总出来分享给大家,希望对...

无规律文本拆分,这些函数你得会(没有分隔符没规律数据拆分)

今天文章来源于表格学员训练营群内答疑,混合文本拆分。其实拆分不难,只要规则明确就好办。就怕规则不清晰,或者规则太多。那真是,Oh,mygod.如上图所示进行拆分,文字表达实在是有点难,所以小熊变身灵...

Python之文本解析:字符串格式化的逆操作?

引言前面的文章中,提到了关于Python中字符串中的相关操作,更多地涉及到了字符串的格式化,有些地方也称为字符串插值操作,本质上,就是把多个字符串拼接在一起,以固定的格式呈现。关于字符串的操作,其实还...

忘记【分列】吧,TEXTSPLIT拆分文本好用100倍

函数TEXTSPLIT的作用是:按分隔符将字符串拆分为行或列。仅ExcelM365版本可用。基本应用将A2单元格内容按逗号拆分。=TEXTSPLIT(A2,",")第二参数设置为逗号...

Excel365版本新函数TEXTSPLIT,专攻文本拆分

Excel中字符串的处理,拆分和合并是比较常见的需求。合并,当前最好用的函数非TEXTJOIN不可。拆分,Office365于2022年3月更新了一个专业函数:TEXTSPLIT语法参数:【...

站长在线Python精讲使用正则表达式的split()方法分割字符串详解

欢迎你来到站长在线的站长学堂学习Python知识,本文学习的是《在Python中使用正则表达式的split()方法分割字符串详解》。使用正则表达式分割字符串在Python中使用正则表达式的split(...

Java中字符串分割的方法(java字符串切割方法)

技术背景在Java编程中,经常需要对字符串进行分割操作,例如将一个包含多个信息的字符串按照特定的分隔符拆分成多个子字符串。常见的应用场景包括解析CSV文件、处理网络请求参数等。实现步骤1.使用Str...

因为一个函数strtok踩坑,我被老工程师无情嘲笑了

在用C/C++实现字符串切割中,strtok函数经常用到,其主要作用是按照给定的字符集分隔字符串,并返回各子字符串。但是实际上,可不止有strtok(),还有strtok、strtok_s、strto...