发布于 2025-12-08 0 阅读
0

解释:Rust 中用于 Web 开发内容的 Future

Rust 中 Futures 在 Web 开发中的应用

内容

正如我在上一篇文章中所解释的,Future 是异步环境的一部分。我们需要语法、类型和运行时才能以非阻塞方式运行代码。Future 是 Rust 异步故事中的类型部分。

内容

  1. 两种观点
  2. 未来究竟是什么?
  3. 何时使用期货?
  4. 为什么使用 Futures 可以节省时间?
  5. 如何使用期货?
  6. 期货有何不同或困难之处?
  7. 如何执行多个 Future?
  8. 期货的未来如何?
  9. 概括

两种观点

如果你之前用过 NodeJS,Rust 中的 Future 就没什么意义了。在 NodeJS 中,一切都是异步发生的。因此,为了能够说“嘿,我真的需要等待这个 GET HTTP 调用的响应”,你需要添加.then()一个,这样你就可以确保在HTTP 调用完成后Promise只执行 里面的代码。.then()

在 Rust 中,默认情况下所有内容都是阻塞和同步的,因此您可能会问自己:“为什么要费心处理复杂性,这正是我最初想要的!”

Rust 是一种系统编程语言。因此,要用 Rust 编写应用程序,你必须始终戴着两顶帽子。“系统帽”(⛑) 和“程序员帽”(🎩)。系统帽(⛑) 让你从机器的角度思考什么才是真正最好的,而程序员帽(🎩) 则负责语法和软件的编写方式。

如果您来自 NodeJS,Systems Hat 由 Google 的 V8 运行时负责处理,因此您可以专注于语法。在 Rust 中,您可以获得 crate 的帮助,尽管您需要自己做出某些决定。

系统知识帽 (Systems Hat) 是我们想要使用 Future 的原因。因此,你需要在应用程序中将 Future 作为一种类型来处理,然后确保使用运行时来实际执行它们。如果你正在使用 Future(例如,当你使用的 crate 返回一个 时Future),你必须了解数据的来源。在这种情况下,程序员知识帽 (Programmer Hat)系统知识帽 (Systems Hat) 是必需的。

到底是什么Future?🎩

Rust 中的AFuture实际上是trait,如果你想实现它,它看起来像这样:

trait Future {
    type Item;
    type Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
Enter fullscreen mode Exit fullscreen mode

方法poll很重要。它将在运行时被调用,并返回Async::ReadyAsync::NotReady

如果您想从远程位置或文件系统获取不同的杂志期刊,您可以创建并返回自己的杂志Future期刊:

struct Magazine {
    issues: Vec<u8>
}
Enter fullscreen mode Exit fullscreen mode

以及它的特征implFuture

impl Future for Magazine {
    // here we return a single byte
    type Item = u8;
    type Error = io::Error;

    // this method is getting called from the runtime. Everytime we can read
    // a byte into the buffer, we return `Async::Ready`
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let mut buffer = [0;1];
        match self.0.poll_read(&mut buf) {
            Ok(Async::Ready(_num_bytes_read)) => Ok(Async::Ready(buffer[0])),
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => Err(e)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

运行时将使用您请求的信息填充缓冲区,并且缓冲区满后将Future返回。Async::Ready

Future这就是Rust 中a 的基本本质。

何时使用 Futures?🎩

实现Future类型的特征就是告诉机器“嘿,这可能需要一段时间,在执行这部分代码时要考虑到这一点”。

Rust 是一门同步语言。所有操作都是逐行执行,直到每个结果处理完毕。除非你想执行一些需要更长时间才能完成的任务,否则这通常不成问题。

如果您使用 Rust 创建了一个微服务,并且需要从其他三个微服务中获取一些数据,然后合并数据并将其写入数据库,那么这就是您要考虑使用 Futures 的用例。

为什么使用 Futures 可以节省时间?⛑

在我们的测试场景中,每个 HTTP 调用可能需要 1 分钟。现在,您无需等待 3 分钟让所有调用完成,而是希望将它们并行运行,这样您仍然只需等待 1 分钟,而不是 3 分钟。

因此,您创建三种方法,Future分别返回一个方法,收集它们,然后将它们传递给运行时(tokio::run例如通过 tokio)。

如何使用 Futures?🎩 + ⛑

如果 crate 返回Future,则在 准备好后,您可以使用.and_then()来处理结果Future.map()在 上使用Future可以更改 的类型Future

在异步 Rust 的世界里,我们必须处理不同类型的数据。例如,我们处理的不是字符串和数字,而是值流。你很可能会处理流。

AStream是 Future 的扩展。Future 只会生成一个值,而 aStream则会一直生成值,只要这些值存在。

  • Stream:AStream类似于Future,是类型上可以添加的一个 trait impl。它允许你迭代返回值( 或Some(_)None
  • Sink:用于将数据连续(异步方式)写入套接字或文件系统

Stream 用于读取数据,而 Sinks 用于写入数据。我们的 Web 生态系统中有两种类型的 Stream:

  • 字节流(如 HTTP 主体或 TCP 流)
  • 消息流(如 WebSocket 帧或 UDP 数据包),其中每条消息都有固定的大小

代码示例

让我们看一个用例,你使用的 crate 返回一个Future。进行 HTTP 调用时,reqwest这是一个很好的例子。在从Future中返回时reqwest,你可以使用它.and_then()来处理结果 (🎩):

// We have to use "r#" before "async" because "async" is a reserved keyword.
use reqwest::r#async::Client;

// The return type has to be `Future<Item=(), Error=()>` to be able
// to use `tokio::run`. 
// If it has a different type, you have to use `tokio::block_on`
fn fetch_data() -> impl Future<Item=(), Error=()> {
    Client::new()
        .get(url)
        .send()
        .and_then(|res| {
            res.into_body().concat2()
        })
        .map_err(|err| println!("request error: {}", err))
        .map(|body| {
            // here you can use the body to write it to a file
            // or return it via Ok()
            // If you return it via for example Ok(users)
            // then you need to adjust the return type in impl Future<Item=TYPE
            // Examples can be found here: 
            // https://github.com/gruberb/futures_playground

            // For now, lets just turn the body into a Vector
            let v = body.to_vec();
        })
}
Enter fullscreen mode Exit fullscreen mode

Future一旦创建了返回( )的方法fetch_data()就必须将其传递给像 tokio (⛑) 这样的运行时:

tokio::run(fetch_data());
Enter fullscreen mode Exit fullscreen mode

高层概述

  1. Future您从外部箱子收到
  2. AFuture可能会返回一个Stream值,因此您必须将其形成Stream为可以同步方式使用的类型(例如 Vector 或 String)
  3. 通过方法返回整个 Future -> impl Future<Item=(), Error=()>,其中括号()是要返回的实际类型的占位符
  4. 您可以通过以下方式将方法传递给 tokio 之类的运行时tokio::run(method())
  5. run调用将启动运行时,设置所需的资源,然后将这个未来放在线程池上并开始轮询你的未来
  6. 然后它会尝试将工作传递给操作系统
  7. 每次运行时轮询你的 时Future,如果你正在等待的底层 I/O 资源Future尚未准备好,它将返回NotReady​。运行时看到此NotReady​返回值后,会让你的Future进入睡眠状态
  8. 一旦来自底层 I/O 资源的事件到来,运行时就会检查此 I/O 资源是否与您的 关联Future,然后再次开始轮询。这一次,您的Future将能够返回一个Ready带有值的 ,因为底层 I/O 资源已经提供了一个值
  9. 然后,运行时将设置的状态为Future就绪,并处理.and_then()部分代码

与 NodeJS不同Future, 是通过tokio::run而不是之前执行的。在 Node 中,只要你写入Promise, 对象就会立即返回。

Futures 有什么不同或困难之处?⛑

让我们回顾一下上面的例子:

  • 我们创建一个新的客户端,Client::new()并提出.send()我们的请求
  • 我们将得到Response回报:
pub struct Response {
    status: StatusCode,
    headers: HeaderMap,
    url: Box<Url>,
    body: Decoder,
    ...
}
Enter fullscreen mode Exit fullscreen mode
  • 本体本身就是一个解码器,可以将其转化为Body通路.into_body()
  • Body本身实现了一个Stream(如前所述)。
  • 现在我们可以研究 Rust 的 Futures API 并发现:我们可以通过以下方式将字节流转换为单个项目.concat2()
...
        .and_then(|res| {
            res.into_body().concat2()
        })
...
Enter fullscreen mode Exit fullscreen mode
  • 我们在部分中使用此单个项目.map()作为body
  • 在代码编辑器的帮助下,你会发现这body实际上是一个Chunk从库返回的Hyper
  • 我们Chunk现在可以把它变成Vector
...
        .map(|body| {
            let v = body.to_vec();
            // do whatever with v
        })
...
Enter fullscreen mode Exit fullscreen mode

从那时起,我们就回到了“正常”的 Rust 土地,可以忘记刚刚发生的事情🙃。

您可以在此GitHub 存储库中找到完整的示例。在那里,我收到一个JSON并将其写入文件。

这就是为什么一开始处理 Future 会如此困难的原因之一。你必须考虑比 NodeJS 等框架低得多的层面。此外,它的async/await语法尚未最终确定,这会导致大量的重复代码。

这些心理步骤可以帮助您在处理期货时不会迷失方向:

  1. 我从这个库获得的返回类型或值是什么?
  2. 我如何访问Stream此响应中的值?
  3. Stream当我通过收集所有值时,图书馆会将其变成什么.concat2()
  4. 我如何将这种新类型转换为 Vector 或其他 Rust std 格式,以便将其传递给同步方法?

期货如何运作

基本上,您总是想弄清楚如何访问值流、收集它们,然后处理生成的对象。

如何执行多个Future?🎩

一般来说,您希望将您的值收集为Streams,以便对于您获得的每件物品Stream,您都可以产生一个新的Future来处理它。

Rust Futures API 有一个名为的方法,FuturesUnordered您可以使用它来添加多个Future

use futures::stream::futures_unordered::FuturesUnordered;
use hyper::{client::ResponseFuture, Client};

fn setup_requests() -> FuturesUnordered<ResponseFuture> {
    let mut list_of_futures = FuturesUnordered::new();

    let client = Client::new();

    let first = client.get(URL);
    list_of_futures.push(first);

    let second = client.get(URL);
    list_of_futures.push(second);

    list_of_futures
}

Enter fullscreen mode Exit fullscreen mode

在此示例中,我们将hyper其用于 HTTP 调用。其余代码可在 Github 上找到。

如果您使用,语法会略有不同reqwest。在这里,您.join()有多个请求并将其作为“一个 Future”返回。

fn fetch() -> impl Future<Item=(), Error=()> {
    let client = Client::new();

    let json = |mut res : Response | {
        res.json::<STRUCT_TYPE>()
    };

    let request1 =
        client
            .get(URL)
            .send()
            .and_then(json);

    let request2 =
        client
            .get(URL)
            .send()
            .and_then(json);

    request1.join(request2)
        .map(|(res1, res2)|{
            println!("{:?}", res1);
            println!("{:?}", res2);
        })
        .map_err(|err| {
            println!("stdout error: {}", err);
        })
}
Enter fullscreen mode Exit fullscreen mode

完整代码也可以在GitHub上找到。

期货的未来是什么?🎩 + ⛑

Future 将在 1.37 版本中正式加入 Rust 稳定版,大约在六月左右。此外,语法和运行时也发生了变化,这将减少您编写的代码量,以便将这些值流转换Future为同步 Rust 格式。

你也可以使用Runtimecrate,它可以帮你省去几乎所有的样板代码。不过,完成上述过程可以帮助你更深入地理解 Futures。

概括

如果您执行异步操作,例如从操作系统获取文件或向远程服务器发出 HTTP 请求,那么 Futures 可让您以非阻塞方式处理返回值。

如果是同步操作,则必须阻塞正在运行该操作的线程,并等待结果返回后才能继续执行。为了以异步方式执行此操作,我们有一个运行时,它会自行创建线程并接收 Future。Future当操作系统将值返回给运行时时,它会将值填充到 Future 中。

一旦Future满足,运行时就会设置Async::Ready并且.and_then()部分代码将得到执行。

鏂囩珷鏉ユ簮锛�https://dev.to/gruberb/explained-rust-futures-for-web-development-a10