[Rust] Tokio 堆栈概览:运行时
引用其第一条公告,“Tokio 是一个用 Rust 编写快速网络代码的平台,主要旨在作为其他库的基础”。
这个平台的核心部分,运行时,也称为 tokio,这就是这篇文章的主题;理解 tokio 运行时对于理解整个平台以及(考虑到当前的情况)如何用 Rust 编写异步代码至关重要。
什么是异步运行时?
Rust 核心提供了构建异步应用程序所需的类型。然而,在构建异步网络应用程序时,我们发现自己需要大量的样板代码。我们可以自己编写,也可以使用提供现成(甚至可能更好)代码的库。这就是 Tokio 等异步运行时的作用,它提供了构建此类应用程序的构建块。
期货
让我们首先看看 Rust 的核心功能,这样我们就可以更好地理解如果我们自己构建异步网络应用程序会缺少什么。
PS:我已经写了一篇关于异步 Rust 的介绍,所以这将是一个枯燥的解释。
异步 Rust 允许我们创建并发应用程序。它通过语法实现这一点async/.await
。基本上,使用async
desugar声明的块和函数会变成一个块或函数,该块或函数返回名为 的特征的实现Future
。Future
它是一个状态机,因此它可以跟上某个操作的进度,这意味着它可以在某个点停止处理,并在再次执行时从停止的地方继续执行。从更高的层次上讲,我们可以说 Future 是一个可能已准备好也可能尚未准备好的值的表示,这种二元性通过名为 的枚举来实现,Poll
该枚举有两个变体:Pending
和Ready<T>
。该Future
特征还包含一个名为 的函数poll()
,它将尝试在 Future 中尽可能多地取得进展(从而推动状态机前进)。这个函数 ,在我们访问Futurepoll()
时首先执行。访问Future 就是将其传递给调度程序(以前称为执行程序),该调度程序将执行它。如果它处理完毕,则返回 ,否则返回 ,并且调度程序将 Future 放在一边,等待再次向它发出请求。这个请求来自驱动程序(以前称为反应器),它是一个I/O 事件循环。.await
.await
poll()
Ready<T>
Pending
poll()
Rust 不提供最后两个功能。因此我们需要一个 crate 来帮助我们。此外,我们还需要一些与时间相关的实用程序来处理所有这些调度工作。
毋庸置疑,这正是 Tokio 运行时所提供的。引用其文档:
与其他 Rust 程序不同,异步应用程序需要运行时支持。具体来说,以下运行时服务是必需的:
- 一个 I/O 事件循环,称为驱动程序,它驱动 I/O 资源并将 I/O 事件分派给依赖于它们的任务。
- 使用这些 I/O 资源执行任务的调度程序。
- 用于安排工作在一段设定的时间段后运行的计时器。
调度器
当你第一次编写异步函数时,你会意识到调用该函数的地方也必须是异步的。如果你一路向上尝试将main()
函数设置为异步,Rust 会告诉你“main
函数不允许异步async
”。
向 Rust 询问explain
这个错误会给我们一个提示:
$ rustc --explain E0752
`fn main()` or the specified start function is not allowed to be `async`. Not having a correct async runtime library setup may cause this error.
在网上快速搜索就足以提供解决方案:我们必须导入tokio
并使用这个属性宏:
#[tokio::main]
async fn main(){
// ...
}
然而,即使它确实有效,仍然存在一个问题……
为什么?
因为在某个时刻必须处理 Future,而main()
Rust 程序中函数之上没有任何东西,所以处理 Future 的程序必须在函数之下main()
。换句话说,函数main()
是程序的入口。运行二进制文件的操作系统对 Future 一无所知,所以必须在“内部”管理它们,也就是说,在我们进入程序之后。所以,函数main()
必须是同步的。
如果是这样,main()
如果顶级函数不能异步,Tokio 如何实现异步?嗯,它做不到。#[tokio::main]
将脱糖async fn main()
成这样:
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
// ...
})
}
使用属性宏时#[tokio::main]
,我们正在构建一个运行时main()
,它将处理 Future 树。为什么我称之为树?因为一个 Future 可能包含.await
其他 Future。稍后我将详细讨论多次.await
调用。现在,让我们继续讨论处理 Future 树的思路。
处理期货树
考虑下面的例子。
#[tokio::main]
async fn main() {
let (foo, bar) = tokio::join! { foo(), bar() };
println!("{}{}", foo, bar);
}
async fn foo() -> &'static str {
let listener = std::net::TcpListener::bind("0.0.0.0:8080").unwrap();
match listener.accept(){
Ok(_pair) => {
println!("`foo()` is finished");
"foo"
},
Err(_error) => "error"
}
}
async fn bar() -> &'static str {
let listener = std::net::TcpListener::bind("0.0.0.0:8081").unwrap();
match listener.accept(){
Ok(_pair) => {
println!("`bar()` is finished");
"bar"
},
Err(_error) => "error"
}
}
提示:运行上面的代码
0.0.0.0:8080
并0.0.0.0:8081
使用浏览器连接到两者,然后在运行程序的终端中检查结果。
当我们调用 时foo().await
,我们将foo()
的 Future 交给了负责调用poll()
它的运行时调度程序。Future 由调度程序作为任务的一部分执行。您可以将任务视为一个线程,它不是由操作系统调度程序处理的,而是由运行时调度程序处理的(它们是虚拟/绿色线程)。
foo()
这将尽可能地运行直至完成,这意味着执行器不会抢先停止它并运行其他东西来代替它(就像操作系统对其线程所做的那样)。对于 Tokio 调度程序,只要任务正在执行相关工作,它就可以继续工作。用更专业的术语来说,任务一直运行直到它们产生。在我们的例子中,foo()
一直运行直到它开始在端口 上监听8080
。如果你想了解我们代码的哪一部分明确地产生了任务,放弃吧。它不在那里。我们不编写 yields,Rust 为我们管理了它。
foo()
在yield之后, s 将调用join!
——一个宏,它将一直运行,直到开始监听端口。此时,由于两个函数都已 yield,我们有两个 Future 等待再次被轮询,并且它们可以按任意顺序轮询。现在,想象一下and/or在其中调用异步函数,将新任务交给调度程序。在这样的场景中,我们有一个 Future 树。这里需要理解的一件重要事情是,我们有一个“根 Future”(异步书籍称之为“顶级 Future”,但我还是坚持使用“根”);在本例中,它是 中的异步块返回的 Future (您可以在去糖化示例中找到它)。这至少有两个原因使其如此重要。.await
bar()
8081
foo()
bar()
main
block_on()
首先,一个任务负责一个 Future 树。假设我们有一个async fn main()
。正如我们所见,在底层,这是一个main()
会block_on()
触发异步块的普通任务。如果在这个 Future 中我们处理.await
另一个 Future,它将由同一个任务处理,因为它是同一棵树的一部分。
其次,它指出了并发和并行之间的界限。如果你只是使用.await
或join!
future,你永远不会有两个 Tokio 任务同时运行,因为最终,我们的main()
root .await
-future 及其 node-future 会作为同一任务的一部分依次执行,因此在同一个操作系统线程中。换句话说,你的异步程序将具有并发性,但没有并行性。
重新审视我们的例子,即使端口8080
和8081
被同时访问,foo()
和bar()
也会被一个接一个地执行,因为它们是
水果同一棵树的未来。当然,这没什么大不了的,但如果你记住我们讨论的是网络应用程序,并以此为基础,对这个愚蠢的例子进行推断,你很快就会发现这不可能是正确的。
调度并行任务
如上所述,main()
是我们程序的入口点,所以我们所做的一切都在它下面。我们下面的内容(#[tokio::main]
去糖化后的内容)是一个runtime
使用 构建的new_multi_thread()
:一个多线程的 Tokio 运行时。到目前为止,我们只使用了其中一个线程;它正在运行由 生成的任务block_on()
。如果我们想要并行,我们需要将 Future 交给运行时本身,这样它们就可以成为“根 Future”,并进而成为新的任务。为了实现这种并行性,即允许运行时使用其线程池中不同的工作线程执行我们的任务,我们需要使用spawn
任务。
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let db: Arc<Mutex<HashMap<&str, &str>>> = Default::default();
tokio::spawn(foo(db.clone()));
tokio::spawn(bar(db.clone()));
handle(db).await;
}
async fn foo(db: Arc<Mutex<HashMap<&str, &str>>>) {
let listener = std::net::TcpListener::bind("0.0.0.0:8080").unwrap();
match listener.accept(){
Ok(_pair) => {
loop {
if let Ok(mut lock) = db.try_lock(){
println!("`foo()` is finished");
lock.insert("f", "foo");
break;
}
}
},
Err(_error) => println!("error"),
}
}
async fn bar(db: Arc<Mutex<HashMap<&str, &str>>>) {
let listener = std::net::TcpListener::bind("0.0.0.0:8081").unwrap();
match listener.accept(){
Ok(_pair) => {
loop {
if let Ok(mut lock) = db.try_lock(){
println!("`bar()` is finished");
lock.insert("b", "bar");
break;
}
}
},
Err(_error) => println!("error"),
}
}
async fn handle(db: Arc<Mutex<HashMap<&str, &str>>>) {
loop {
if let Ok(lock) = db.try_lock(){
if lock.len() == 2 {
println!("{}{}", lock.get("f").unwrap(), lock.get("b").unwrap());
break;
}
}
}
}
在上面的例子中,foo()
和bar()
本身就成为了根 Future,并且作为Futurehandle()
中的单个 Future block_on()
,我们最终得到了三棵不同的 Future 树。这样,如果我们“同时”调用这三个函数,它们就可以在三个不同的线程中执行(假设运行时有这些线程)。
我使用 可能有点过头了
Arc<Mutex<HashMap>>
,因为用 可以更轻松地处理JoinHandle
。我的理由是,使用智能指针更容易看出并行性,因为JoinHandle
看起来与我们使用 的方式非常相似.await
。
超越
如果您想更进一步,一个好的起点就是了解 Tokio 如何采用工作窃取技术来管理其多线程调度程序。
司机
让我们重新审视一下之前的例子。在foo()
和bar()
都让步之后(它们在 处开始监听时都会发生这种情况)0.0.0.0
,它们会返回Poll::Pending
。由于仍有工作要做,调度程序不会清除它们,但不会再次自动清除它们;而是在请求时再次poll()
清除它们。poll()
在这种情况下,再次需要poll()
它们的原因是对 的访问0.0.0.0
。但是,如果 和 都没有foo()
实际bar()
运行,那么哪个进程会触发触发?必须有某个进程在运行,来协调我们对 和调度程序的访问。这就是驱动程序0.0.0.0
的作用,Tokio 就是这样调用它的I/O 事件循环 的。
不过,在深入探讨驱动因素之前,让我们先来谈谈使其成为必要的因素:一个悬而未决的未来。
待定未来
也许这个主题属于调度程序,但由于它对于理解驱动程序至关重要,所以我认为它也适合这里。
当我们轮询一个 Future 时,它会接收一个Context
参数。目前,这Context
只是 的包装器&Waker
。这是对调用 的任务中的&Waker
的引用。它有一个由驱动程序调用的方法,因此(拥有 的任务)会意识到它应该再次访问 Future。Waker
poll()
&Waker
wake()
Waker
poll()
以下流程是其工作原理的说明性示例:
- 未来正在被
.await
创造。 - 因此,它被交给由创建的调度程序任务
block_on()
。 - 这个任务将是
poll()
未来,它将做一些工作,直到它到达必须屈服的地步;假设它正在某个地址监听,就像我们foo()
一样。 - 在屈服之前,未来作为参数被接收
clone()
。这将未来和任务“绑定”起来。&Waker
poll()
- 它产生并返回
Poll::Pending
。 - 当操作系统的 I/O 在某个地址上接收到连接时,它会让驱动程序知道。
- 驱动程序将调用未来存储的
wake()
内容,这将唤醒任务。&Waker
- 被唤醒的任务将
poll()
再次执行 Future。如果它返回,则复制上次传递的Pending
新值并重新启动该进程。Waker
poll()
第 6 和第 7 步描述了驱动程序作为操作系统和任务调度程序之间的接口的作用。这意味着驱动程序*将对操作系统执行系统调用,例如kqueue
在 BSD/macOS、IPCP
Windows 或epoll
Linux 中(现在我们越来越多地听到关于 的消息io_uring
,Tokio 也处理这个问题)。
* 实际上,这些系统调用并非由驱动程序发出。交互实际上是在驱动程序和 之间进行的
mio
,因此它是mio
由谁与操作系统交互的。话虽如此,我将在这里对其进行抽象,以便我们可以描述 Tokio 驱动程序与操作系统 I/O 之间的简化对话,这构成了上面的第六步。
驱动程序是一个事件循环,它会持续使用其中一个系统调用轮询操作系统。让我们回顾一下之前的foo()
例子。如果驱动程序轮询操作系统并发现存在连接0.0.0.0:8080
,它就会将wake()
任务交给poll()
未来处理。
当然,还有很多细节没有讲清楚。比如,调度器和驱动程序之间通过通道beginners
进行通信究竟是如何运作的?不过,我会尊重我标记这篇文章的标签,就此打住。(即便如此,如果我写的文章是面向初学者的,不仅是因为我觉得我们可能还缺少一些入门内容,也因为我自身目前的局限性;而我们现在正徘徊在我的知识鸿沟边缘🙃)。
计时器
该模块tokio::time
是运行时的一部分,提供用于跟踪时间的实用程序。我没有太多要讨论的内容,但为了完整起见,我将引用文档中解释该模块提供的功能的部分:
Sleep
Instant
是不做任何工作并在特定时间完成的未来。Interval
是一个以固定周期产生值的流。它以 a 初始化,Duration
并在每次持续时间结束时重复产生值。Timeout
:包装 Future 或 Stream,设置其允许执行的时间上限。如果 Future 或 Stream 未能及时完成,则会被取消并返回错误。
今天就到这里吧。我觉得还有很多内容没有讲完,但这篇文章已经比我预期的要长了。希望我们接下来讨论其他 crate 时,能够重新回顾一些话题。
到时候见!
鏂囩珷鏉ユ簮锛�https://dev.to/rogertorres/rust-tokio-stack-overview-runtime-9fh封面照片由Pawel Nolbert拍摄