Rust 中 Futures 在 Web 开发中的应用
内容
正如我在上一篇文章中所解释的,Future 是异步环境的一部分。我们需要语法、类型和运行时才能以非阻塞方式运行代码。Future 是 Rust 异步故事中的类型部分。
内容
两种观点
如果你之前用过 NodeJS,Rust 中的 Future 就没什么意义了。在 NodeJS 中,一切都是异步发生的。因此,为了能够说“嘿,我真的需要等待这个 GET HTTP 调用的响应”,你需要添加.then()一个,这样你就可以确保在HTTP 调用完成后Promise只执行 里面的代码。.then()
在 Rust 中,默认情况下所有内容都是阻塞和同步的,因此您可能会问自己:“为什么要费心处理复杂性,这正是我最初想要的!”
Rust 是一种系统编程语言。因此,要用 Rust 编写应用程序,你必须始终戴着两顶帽子。“系统帽”(⛑) 和“程序员帽”(🎩)。系统帽(⛑) 让你从机器的角度思考什么才是真正最好的,而程序员帽(🎩) 则负责语法和软件的编写方式。
如果您来自 NodeJS,Systems Hat 由 Google 的 V8 运行时负责处理,因此您可以专注于语法。在 Rust 中,您可以获得 crate 的帮助,尽管您需要自己做出某些决定。
系统知识帽 (Systems Hat) 是我们想要使用 Future 的原因。因此,你需要在应用程序中将 Future 作为一种类型来处理,然后确保使用运行时来实际执行它们。如果你正在使用 Future(例如,当你使用的 crate 返回一个 时Future),你必须了解数据的来源。在这种情况下,程序员知识帽 (Programmer Hat)和系统知识帽 (Systems Hat) 是必需的。
到底是什么Future?🎩
Rust 中的AFuture实际上是trait,如果你想实现它,它看起来像这样:
trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
方法poll很重要。它将在运行时被调用,并返回Async::Ready或Async::NotReady。
如果您想从远程位置或文件系统获取不同的杂志期刊,您可以创建并返回自己的杂志Future期刊:
struct Magazine {
issues: Vec<u8>
}
以及它的特征impl:Future
impl Future for Magazine {
// here we return a single byte
type Item = u8;
type Error = io::Error;
// this method is getting called from the runtime. Everytime we can read
// a byte into the buffer, we return `Async::Ready`
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut buffer = [0;1];
match self.0.poll_read(&mut buf) {
Ok(Async::Ready(_num_bytes_read)) => Ok(Async::Ready(buffer[0])),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e)
}
}
}
运行时将使用您请求的信息填充缓冲区,并且缓冲区满后将Future返回。Async::Ready
Future这就是Rust 中a 的基本本质。
何时使用 Futures?🎩
实现
Future类型的特征就是告诉机器“嘿,这可能需要一段时间,在执行这部分代码时要考虑到这一点”。
Rust 是一门同步语言。所有操作都是逐行执行,直到每个结果处理完毕。除非你想执行一些需要更长时间才能完成的任务,否则这通常不成问题。
如果您使用 Rust 创建了一个微服务,并且需要从其他三个微服务中获取一些数据,然后合并数据并将其写入数据库,那么这就是您要考虑使用 Futures 的用例。
为什么使用 Futures 可以节省时间?⛑
在我们的测试场景中,每个 HTTP 调用可能需要 1 分钟。现在,您无需等待 3 分钟让所有调用完成,而是希望将它们并行运行,这样您仍然只需等待 1 分钟,而不是 3 分钟。
因此,您创建三种方法,Future分别返回一个方法,收集它们,然后将它们传递给运行时(tokio::run例如通过 tokio)。
如何使用 Futures?🎩 + ⛑
如果 crate 返回Future,则在 准备好后,您可以使用.and_then()来处理结果Future。.map()在 上使用Future可以更改 的类型Future。
在异步 Rust 的世界里,我们必须处理不同类型的数据。例如,我们处理的不是字符串和数字,而是值流。你很可能会处理流。
AStream是 Future 的扩展。Future 只会生成一个值,而 aStream则会一直生成值,只要这些值存在。
- Stream:A
Stream类似于Future,是类型上可以添加的一个 traitimpl。它允许你迭代返回值( 或Some(_))None。 - Sink:用于将数据连续(异步方式)写入套接字或文件系统
Stream 用于读取数据,而 Sinks 用于写入数据。我们的 Web 生态系统中有两种类型的 Stream:
- 字节流(如 HTTP 主体或 TCP 流)
- 消息流(如 WebSocket 帧或 UDP 数据包),其中每条消息都有固定的大小
代码示例
让我们看一个用例,你使用的 crate 返回一个Future。进行 HTTP 调用时,reqwest这是一个很好的例子。在从Future中返回时reqwest,你可以使用它.and_then()来处理结果 (🎩):
// We have to use "r#" before "async" because "async" is a reserved keyword.
use reqwest::r#async::Client;
// The return type has to be `Future<Item=(), Error=()>` to be able
// to use `tokio::run`.
// If it has a different type, you have to use `tokio::block_on`
fn fetch_data() -> impl Future<Item=(), Error=()> {
Client::new()
.get(url)
.send()
.and_then(|res| {
res.into_body().concat2()
})
.map_err(|err| println!("request error: {}", err))
.map(|body| {
// here you can use the body to write it to a file
// or return it via Ok()
// If you return it via for example Ok(users)
// then you need to adjust the return type in impl Future<Item=TYPE
// Examples can be found here:
// https://github.com/gruberb/futures_playground
// For now, lets just turn the body into a Vector
let v = body.to_vec();
})
}
Future一旦创建了返回( )的方法fetch_data(),就必须将其传递给像 tokio (⛑) 这样的运行时:
tokio::run(fetch_data());
高层概述
Future您从外部箱子收到- A
Future可能会返回一个Stream值,因此您必须将其形成Stream为可以同步方式使用的类型(例如 Vector 或 String) - 通过方法返回整个 Future
-> impl Future<Item=(), Error=()>,其中括号()是要返回的实际类型的占位符 - 您可以通过以下方式将方法传递给 tokio 之类的运行时
tokio::run(method()) - 该
run调用将启动运行时,设置所需的资源,然后将这个未来放在线程池上并开始轮询你的未来 - 然后它会尝试将工作传递给操作系统
- 每次运行时轮询你的 时
Future,如果你正在等待的底层 I/O 资源Future尚未准备好,它将返回NotReady。运行时看到此NotReady返回值后,会让你的Future进入睡眠状态 - 一旦来自底层 I/O 资源的事件到来,运行时就会检查此 I/O 资源是否与您的 关联
Future,然后再次开始轮询。这一次,您的Future将能够返回一个Ready带有值的 ,因为底层 I/O 资源已经提供了一个值 - 然后,运行时将设置的状态为
Future就绪,并处理.and_then()部分代码
与 NodeJS不同Future, 是通过tokio::run而不是之前执行的。在 Node 中,只要你写入Promise, 对象就会立即返回。
Futures 有什么不同或困难之处?⛑
让我们回顾一下上面的例子:
- 我们创建一个新的客户端,
Client::new()并提出.send()我们的请求 - 我们将得到
Response回报:
pub struct Response {
status: StatusCode,
headers: HeaderMap,
url: Box<Url>,
body: Decoder,
...
}
- 本体本身就是一个解码器,可以将其转化为
Body通路.into_body()。 Body本身实现了一个Stream(如前所述)。- 现在我们可以研究 Rust 的 Futures API 并发现:我们可以通过以下方式将字节流转换为单个项目
.concat2()
...
.and_then(|res| {
res.into_body().concat2()
})
...
...
.map(|body| {
let v = body.to_vec();
// do whatever with v
})
...
从那时起,我们就回到了“正常”的 Rust 土地,可以忘记刚刚发生的事情🙃。
您可以在此GitHub 存储库中找到完整的示例。在那里,我收到一个
JSON并将其写入文件。
这就是为什么一开始处理 Future 会如此困难的原因之一。你必须考虑比 NodeJS 等框架低得多的层面。此外,它的async/await语法尚未最终确定,这会导致大量的重复代码。
这些心理步骤可以帮助您在处理期货时不会迷失方向:
- 我从这个库获得的返回类型或值是什么?
- 我如何访问
Stream此响应中的值? Stream当我通过收集所有值时,图书馆会将其变成什么.concat2()?- 我如何将这种新类型转换为 Vector 或其他 Rust std 格式,以便将其传递给同步方法?
基本上,您总是想弄清楚如何访问值流、收集它们,然后处理生成的对象。
如何执行多个Future?🎩
一般来说,您希望将您的值收集为Streams,以便对于您获得的每件物品Stream,您都可以产生一个新的Future来处理它。
Rust Futures API 有一个名为的方法,FuturesUnordered您可以使用它来添加多个Future:
use futures::stream::futures_unordered::FuturesUnordered;
use hyper::{client::ResponseFuture, Client};
fn setup_requests() -> FuturesUnordered<ResponseFuture> {
let mut list_of_futures = FuturesUnordered::new();
let client = Client::new();
let first = client.get(URL);
list_of_futures.push(first);
let second = client.get(URL);
list_of_futures.push(second);
list_of_futures
}
在此示例中,我们将hyper其用于 HTTP 调用。其余代码可在 Github 上找到。
如果您使用,语法会略有不同reqwest。在这里,您.join()有多个请求并将其作为“一个 Future”返回。
fn fetch() -> impl Future<Item=(), Error=()> {
let client = Client::new();
let json = |mut res : Response | {
res.json::<STRUCT_TYPE>()
};
let request1 =
client
.get(URL)
.send()
.and_then(json);
let request2 =
client
.get(URL)
.send()
.and_then(json);
request1.join(request2)
.map(|(res1, res2)|{
println!("{:?}", res1);
println!("{:?}", res2);
})
.map_err(|err| {
println!("stdout error: {}", err);
})
}
完整代码也可以在GitHub上找到。
期货的未来是什么?🎩 + ⛑
Future 将在 1.37 版本中正式加入 Rust 稳定版,大约在六月左右。此外,语法和运行时也发生了变化,这将减少您编写的代码量,以便将这些值流转换Future为同步 Rust 格式。
你也可以使用Runtimecrate,它可以帮你省去几乎所有的样板代码。不过,完成上述过程可以帮助你更深入地理解 Futures。
概括
如果您执行异步操作,例如从操作系统获取文件或向远程服务器发出 HTTP 请求,那么 Futures 可让您以非阻塞方式处理返回值。
如果是同步操作,则必须阻塞正在运行该操作的线程,并等待结果返回后才能继续执行。为了以异步方式执行此操作,我们有一个运行时,它会自行创建线程并接收 Future。Future当操作系统将值返回给运行时时,它会将值填充到 Future 中。
一旦Future满足,运行时就会设置Async::Ready并且.and_then()部分代码将得到执行。
