使用 asyncio 在 Python 中进行异步编程
并发与并行
为什么使用 asyncio
如何在 Python 中编写异步代码
异步函数
现实世界的例子
异步生成器
异常处理
深入研究
对于 JavaScript 异步编程的人来说并不是什么新鲜事,但对于 Python 开发人员来说,习惯异步函数和 Future(相当于 JS 中的 Promise)可能并不是一件容易的事
并发与并行
并发和并行听起来很相似,但在编程中它们之间存在一个重要的区别。
想象一下,你一边做饭一边写书,虽然看起来好像是同时在做这两件事,但实际上你只是在两个任务之间切换:等水烧开的时候你在写书,切菜的时候你却暂停写作。这叫做并发。要同时完成这两项任务,唯一的办法就是两个人一起写,一个人做饭,而多核 CPU 正是这么做的。
为什么使用 asyncio
异步编程允许你编写在单线程中运行的并发代码。与多线程相比,第一个优势是你可以决定调度程序在何处从一个任务切换到另一个任务,这意味着在任务之间共享数据更加安全、便捷。
def queue_push_back(x):
if len(list) < max_size:
list.append(x)
如果我们在多线程程序中运行上述代码,则可能有两个线程同时执行第 2 行,因此将同时向队列中添加 2 个项目,并可能使队列大小大于max_size
异步编程的另一个优势是内存使用。每次创建新线程时,都会占用一些内存用于上下文切换。如果我们使用异步编程,那么由于代码在单线程中运行,因此不会出现这个问题。
如何在 Python 中编写异步代码
Asyncio 有 3 个主要组件:协程、事件循环和 Future
协程
协程是异步函数的结果,可以使用关键字async
before声明def
async def my_task(args):
pass
my_coroutine = my_task(args)
当我们使用关键字声明一个函数时,async
该函数不会运行,而是返回一个协程对象。
有两种方法可以从协程中读取异步函数的输出。
第一种方法是使用await
关键字,这只能在异步函数内部使用,并且会等待协程终止并返回结果。
result = await my_task(args)
第二种方法是将其添加到事件循环中,我们将在下一节中看到。
事件循环
事件循环是执行异步代码并决定如何在异步函数之间切换的对象。创建事件循环后,我们可以向其中添加多个协程,这些协程将在run_until_complete
或run_forever
被调用时同时运行。
# create loop
loop = asyncio.new_event_loop()
# add coroutine to the loop
future = loop.create_task(my_coroutine)
# stop the program and execute all coroutine added
# to the loop concurrently
loop.run_until_complete(future)
loop.close()
未来
Future 是一个对象,它充当异步函数输出的占位符,并向我们提供有关函数状态的信息。
当我们将协程添加到事件循环时,就会创建一个 Future。有两种方法可以实现这一点:
future1 = loop.create_task(my_coroutine)
# or
future2 = asyncio.ensure_future(my_coroutine)
第一个方法将一个协程添加到循环中,并返回一个task
Future 的子类型。第二个方法非常类似,它接受一个协程并将其添加到默认循环中,唯一的区别是它也可以接受一个 Future,在这种情况下,它不会执行任何操作,而是直接返回一个未改变的 Future。
一个简单的程序
import asyncio
async def my_task(args):
pass
def main():
loop = asyncio.new_event_loop()
coroutine1 = my_task()
coroutine2 = my_task()
task1 = loop.create_task(coroutine1)
task2 = loop.create_task(coroutine2)
loop.run_until_complete(asyncio.wait([task1, task2]))
print('task1 result:', task1.result())
print('task2 result:', task2.result())
loop.close()
如你所见,要运行异步函数,我们首先需要创建一个协程,然后将其添加到事件循环中,从而创建一个 Future/任务。到目前为止,异步函数内部的代码尚未执行,只有当我们调用loop.run_until_completed
事件循环时,才会开始执行所有已使用loop.create_task
或添加到循环中的协程asyncio.ensure_future
。loop.run_until_completed
这将阻塞你的程序,直到你作为参数传入的 Future 完成为止。在示例中,我们asyncio.wait()
创建了一个 Future,只有当参数列表中传递的所有 Future 都完成后,它才会完成。
异步函数
在 Python 中编写异步函数时需要注意的一点是,仅仅因为你使用了async
关键字 awaitdef
并不意味着你的函数会并发运行。如果你在一个普通函数async
前面添加关键字 await ,事件循环将不间断地运行你的函数,因为你没有指定循环可以在何处中断你的函数以运行另一个协程。指定事件循环可以在何处更改协程其实很简单,每次使用关键字 await 时,事件循环都会停止你的函数运行,并运行注册到该循环的另一个协程。
async def print_numbers_async1(n, prefix):
for i in range(n):
print(prefix, i)
async def print_numbers_async2(n, prefix):
for i in range(n):
print(prefix, i)
if i % 5 == 0:
await asyncio.sleep(0)
loop1 = asyncio.new_event_loop()
count1_1 = loop1.create_task(print_numbers_async1(10, 'c1_1')
count2_1 = loop1.create_task(print_numbers_async1(10, 'c2_1')
loop1.run_until_complete(asyncio.wait([count1_1, count2_1])
loop1.close()
loop2 = asyncio.new_event_loop()
count1_2 = loop1.create_task(print_numbers_async1(10, 'c1_2')
count2_2 = loop1.create_task(print_numbers_async1(10, 'c2_2')
loop2.run_until_complete(asyncio.wait([count1_2, count2_2])
loop2.close()
如果我们执行此代码,我们将看到 loop1 将首先打印所有带前缀的数字c1_1
,然后打印带前缀的数字,c2_1
而在第二个循环中每 5 个数字循环将更改任务。
现实世界的例子
现在我们已经了解了 Python 中异步编程的基础知识,让我们编写一些更实际的代码,它将从互联网上下载页面列表并打印包含页面前 3 行的预览。
import aiohttp
import asyncio
async def print_preview(url):
# connect to the server
async with aiohttp.ClientSession() as session:
# create get request
async with session.get(url) as response:
# wait for response
response = await response.text()
# print first 3 not empty lines
count = 0
lines = list(filter(lambda x: len(x) > 0, response.split('\n')))
print('-'*80)
for line in lines[:3]:
print(line)
print()
def print_all_pages():
pages = [
'http://textfiles.com/adventure/amforever.txt',
'http://textfiles.com/adventure/ballyhoo.txt',
'http://textfiles.com/adventure/bardstale.txt',
]
tasks = []
loop = asyncio.new_event_loop()
for page in pages:
tasks.append(loop.create_task(print_preview(page)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
这段代码应该很容易理解。我们首先创建一个异步函数,用于下载 URL 并打印前 3 行(非空行)。然后,我们创建一个函数,用于对页面列表中的每个页面调用print_preview
,将协程添加到循环中,并将 Future 存储在任务列表中。最后,我们运行事件循环,它将运行我们添加到其中的协程,并打印所有页面的预览。
异步生成器
最后一个我想讲的功能是异步生成器。实现异步生成器其实很简单。
import asyncio
import math
import random
async def is_prime(n):
if n < 2:
return True
for i in range(2, n):
# allow event_loop to run other coroutine
await asyncio.sleep(0)
if n % i == 0:
return False
return True
async def prime_generator(n_prime):
counter = 0
n = 0
while counter < n_prime:
n += 1
# wait for is_prime to finish
prime = await is_prime(n)
if prime:
yield n
counter += 1
async def check_email(limit):
for i in range(limit):
if random.random() > 0.8:
print('1 new email')
else:
print('0 new email')
await asyncio.sleep(2)
async def print_prime(n):
async for prime in prime_generator(n):
print('new prime number found:', prime)
def main():
loop = asyncio.new_event_loop()
prime = loop.create_task(print_prime(3000))
email = loop.create_task(check_email(10))
loop.run_until_complete(asyncio.wait([prime, email]))
loop.close()
异常处理
当协程内部引发未处理的异常时,它不会像在正常同步编程中那样破坏我们的程序,相反,它会存储在未来中,如果你在程序退出之前没有处理异常,你将得到以下错误
Task exception was never retrieved
有两种方法可以解决这个问题,在访问未来结果时捕获异常或调用未来异常方法。
try:
# this will raise the exception raised during the coroutine execution
my_promise.result()
catch Exception:
pass
# this will return the exception raised during the coroutine execution
my_promise.exception()
深入研究
如果您已经阅读了到目前为止的所有内容,您应该知道如何使用 asyncio 编写并发代码,但是如果您想更深入地了解 asyncio 的工作原理,我建议您观看以下视频
如果您想了解 asyncio 的更复杂用法,或者您有任何疑问,请发表评论,我会尽快回复您
鏂囩珷鏉ユ簮锛�https://dev.to/welldone2094/async-programming-in-python-with-asyncio-12dl