如何用 3 行代码实现 Python 代码并发

2025-05-26

如何用 3 行代码实现 Python 代码并发

我受到了@rpalo的启发,他探索了 Python 标准库中的精华

我决定通过一个例子来分享我最喜欢的 Python 标准库技巧之一。整个代码无需外部包即可在 Python 3.2+ 上运行。

最初的问题

假设您有一千个 URL 需要处理/下载/检查,那么您需要发出尽可能多的 HTTP GET 调用并检索每个响应的主体。

这是实现此目的的一种方法:

import http.client
import socket

def get_it(url):
    try:
        # always set a timeout when you connect to an external server
        connection = http.client.HTTPSConnection(url, timeout=2)

        connection.request("GET", "/")

        response = connection.getresponse()

        return response.read()
    except socket.timeout:
        # in a real world scenario you would probably do stuff if the
        # socket goes into timeout
        pass

urls = [
    "www.google.com",
    "www.youtube.com",
    "www.wikipedia.org",
    "www.reddit.com",
    "www.httpbin.org"
] * 200

for url in urls:
    get_it(url)
Enter fullscreen mode Exit fullscreen mode

(我不会使用标准库作为 HTTP 客户端,但就本文的目的而言,这是可以的)

如你所见,这里没有什么魔法。Python 迭代了 1000 个 URL,并逐一调用它们。

我的电脑上这个东西占用了 2% 的 CPU,并且大部分时间都在等待 I/O:

$ time python io_bound_serial.py
20.67s user 5.37s system 855.03s real 24292kB mem
Enter fullscreen mode Exit fullscreen mode

它运行了大约14分钟。我们可以做得更好。

告诉我诀窍!

from concurrent.futures import ThreadPoolExecutor as PoolExecutor
import http.client
import socket

def get_it(url):
    try:
        # always set a timeout when you connect to an external server
        connection = http.client.HTTPSConnection(url, timeout=2)

        connection.request("GET", "/")

        response = connection.getresponse()

        return response.read()
    except socket.timeout:
        # in a real world scenario you would probably do stuff if the
        # socket goes into timeout
        pass

urls = [
    "www.google.com",
    "www.youtube.com",
    "www.wikipedia.org",
    "www.reddit.com",
    "www.httpbin.org"
] * 200

with PoolExecutor(max_workers=4) as executor:
    for _ in executor.map(get_it, urls):
        pass
Enter fullscreen mode Exit fullscreen mode

让我们看看发生了什么变化:

# import a new API to create a thread pool
from concurrent.futures import ThreadPoolExecutor as PoolExecutor

# create a thread pool of 4 threads
with PoolExecutor(max_workers=4) as executor:

    # distribute the 1000 URLs among 4 threads in the pool
    # _ is the body of each page that I'm ignoring right now
    for _ in executor.map(get_it, urls):
        pass
Enter fullscreen mode Exit fullscreen mode

因此,我们用 3 行代码将一个缓慢的串行任务变成了一个并发任务,耗时不到 5 分钟:

$ time python io_bound_threads.py
21.40s user 6.10s system 294.07s real 31784kB mem
Enter fullscreen mode Exit fullscreen mode

我们从 855.03 秒增加到 294.07 秒,增加了 2.9 倍!

等等,还有更多

这个新 API 的优点在于你可以替换

from concurrent.futures import ThreadPoolExecutor as PoolExecutor
Enter fullscreen mode Exit fullscreen mode


from concurrent.futures import ProcessPoolExecutor as PoolExecutor
Enter fullscreen mode Exit fullscreen mode

告诉 Python 使用进程而不是线程。出于好奇,我们来看看运行时间会发生什么:

$ time python io_bound_processes.py
22.19s user 6.03s system 270.28s real 23324kB mem
Enter fullscreen mode Exit fullscreen mode

比线程版本少了 20 秒,差别不大。请记住,这些并非科学实验,而且我在运行这些脚本时正在使用电脑。

奖励内容

我的计算机有 4 个核心,让我们看看增加工作线程数量的线程版本会发生什么情况:

# 6 threads
20.48s user 5.19s system 155.92s real 35876kB mem
# 8 threads
23.48s user 5.55s system 178.29s real 40472kB mem
# 16 threads
23.77s user 5.44s system 119.69s real 58928kB mem
# 32 threads
21.88s user 4.81s system 119.26s real 96136kB mem
Enter fullscreen mode Exit fullscreen mode

需要注意三件事:RAM 占用明显增加,我们在 16 个线程左右遇到了瓶颈,在 16 个线程时,我们的速度比串行版本快 7 倍以上。

如果您不认识time的输出,那是因为我给它起了这样的别名:

time='gtime -f '\''%Us user %Ss system %es real %MkB mem -- %C'\'
Enter fullscreen mode Exit fullscreen mode

gtime安装位置brew install gnu-time

结论

我认为ThreadPoolExecutorProcessPoolExecutor是 Python 标准库中非常酷的新增功能。它们的大部分功能都可以用“老”的线程多处理和 FIFO 队列来实现,但这个 API 好太多了。

文章来源:https://dev.to/rhymes/how-to-make-python-code-concurrent-with-3-lines-of-code-2fpe
PREV
HTTPS localhost 在 localhost mkcert 上使用 HTTPS 的真正简单方法
NEXT
JavaScript 的工作原理