RxJS switchMap、concatMap、mergeMap、exhaustMap

2025-06-08

RxJS switchMap、concatMap、mergeMap、exhaustMap

本文最初@pietrucha发布在Angular Academy上。

在 RxJS 在前端开发中流行起来之前,我们都是用 Promise 来处理 AJAX 请求的。Promise 易于使用和理解,但在某些更复杂的场景中,却显得力不从心。在本文中,我将解释如何在四种不同的场景中,通过四种不同的扁平化策略—— mergeMapconcatMap switchMapexhaustMap,高效地使用高阶可观察流。

介绍

在浏览器中执行 HTTP 请求本质上是异步的。这意味着我们可以使用 RxJS Observables 对其进行建模。在 Angular 中,我们有一个HttpClient服务,其中包含与 HTTP 操作(get、post、put 等)对应的方法。这些方法返回我们可以订阅的 Observables。但 HTTP 操作的执行通常发生在另一个事件发生之后,例如点击事件。我们也可以将此类浏览器点击事件建模为可观察流,因为这些事件可能在未来的任何时间出现多次。因此,现在我们有两个想要按顺序使用的流 -点击事件应该触发 HTTP 调用(在示例中,我使用save()方法)。简单的方法是订阅点击事件,并在订阅函数内部订阅save()方法。

fromEvent(saveBtn, 'click')
  .subscribe(click => {
    save().subscribe(result => {
      // handle result
    })
   });
Enter fullscreen mode Exit fullscreen mode

上面的代码虽然能正常工作,但首先,它包含嵌套订阅,这很容易让人联想到回调地狱,而且代码看起来不太简洁。其次,它不允许我们使用扁平化策略,而扁平化策略在我们想要处理后续点击事件发生在操作流发出最终结果之前的 save()情况时非常有用。想象一下,当用户第二次点击按钮时,HTTP 请求尚未返回结果。会发生什么?我们应该等待第一个 HTTP 请求完成后再启动第二个 HTTP 请求吗?还是应该放弃第一个 HTTP 查询并立即执行第二个 HTTP 查询?又或者,当还有待处理的 HTTP 调用时,我们根本不允许后续 HTTP 调用?如你所见,有很多方法可以处理这种棘手的情况。通过使用适当的扁平化操作(我们将在下一章中讨论),我们可以轻松实现适合自己的解决方案。

高阶可观测量

我们可以将第一个代码片段重写为如下代码片段。在这里,我们不是立即订阅点击map流,而是将其放入方法的调用中save()。由于该save()方法返回 Observable 本身,因此我们创建了一个高阶可观察对象 (Observable)。这种可观察对象通常由两个流组成。在我们的例子中,有一个外部流,用于发出点击事件;还有一个内部流,用于发出方法的结果save()

fromEvent(saveBtn, 'click')
  .pipe(map(click => save()))
  .subscribe(result => {
    // result is a stream!
  });
Enter fullscreen mode Exit fullscreen mode

高阶可观察对象是发出可观察对象本身的事件的可观察对象;换句话说,它是可观察对象的可观察对象。

上面代码片段最有意思的部分是订阅。由于我们将点击事件映射到了另一个流中,订阅的结果也将是一个流!我们可以通过在result第一个订阅函数内部订阅来使用 HTTP 查询的最终结果,但最终我们又会遇到嵌套订阅。现在到了精彩内容的时候了!RxJS 附带了一些特殊的操作符,它们可以将高阶可观察对象转换为一阶可观察对象,这样我们就可以只订阅一阶可观察对象,并从内部流(而不是内部流的订阅)接收事件。

展平高阶可观测量

将高阶流转换为一阶流的操作称为扁平化。当我们扁平化流时,它不再发出其内部流,而是发出来自内部流的事件。使用 RxJS 扁平化非常简单。我们要做的就是将合适的运算符应用于高阶流。下面的代码片段使用concatAll()运算符来扁平化流。因此,result订阅中的 是该方法返回的内部可观察对象的事件save()

fromEvent(saveBtn, 'click')
  .pipe(map(click => save()), concatAll())
  .subscribe(result => {
    // result is the result of save()
  });
Enter fullscreen mode Exit fullscreen mode

由于map()concatAll()经常一起使用,因此有一个等效运算符concatMap()可以让我们实现完全相同的结果。下面的代码片段展示了concatMap()运算符的用法:

fromEvent(saveBtn, 'click')
  .pipe(concatMap(click => save()))
  .subscribe(result => {
    // result is the result of save()
  });
Enter fullscreen mode Exit fullscreen mode

concatMap()并不是 RxJS 中展平高阶流的唯一方法。在接下来的章节中,我们将了解concatMap()mergeMap()switchMap()之间的区别exhaustMap()。这些运算符都是展平运算符,但它们的适用场景截然不同。

康卡特地图

我们已经问过一个问题,关于外部流在内部流执行完成之前发出事件(例如用户点击按钮)的情况。处理这种情况的策略之一是等到内部流执行完成后再订阅下一个事件。这正是我们concatMap()将要做的。请观看下面的录制演示。

在此示例中,在模拟 HTTP 查询执行期间,“保存”按钮被第二次点击。计数数字代表该查询的执行次数。由于第二次点击事件发生在保存查询完成之前,因此第二个查询被排队等待稍后执行。这样,我们就串联了内部流执行。

concatMap()是第一个提出的高阶流展平策略。当我们的用例需要顺序性时,可以使用它。需要注意的是,浏览器端的 HTTP 查询顺序可能与服务器接收它们的顺序不同。我们可以想象这样一种情况:有两个 HTTP 查询,假设查询A和查询B,浏览器首先发送查询A,然后立即发送查询B。无法保证服务器以相同的顺序接收这些查询。由于网络状况,服务器可能首先收到查询B ,然后收到查询A。这就是为什么在浏览器端制定适当的策略如此重要。

合并地图

处理外部流在内部流执行期间发出事件的情况的另一种方法是使用运算符合并mergeMap()执行。在这种情况下,我们不必关心任何顺序,只需并发执行内部流即可。下面的录制演示说明了这种情况。我们可以看到,第二次单击“保存”按钮时,模拟的 HTTP 查询立即启动,并与第一个查询并发执行。

下面的源代码片段展示了如何应用mergeMap()运算符来实现这一结果。

fromEvent(saveBtn, 'click')
  .pipe(mergeMap(click => save()))
  .subscribe(result => {
    // result is the result of save()
  });
Enter fullscreen mode Exit fullscreen mode

SwitchMap

我们学习了将高阶流转换为一阶流的两种策略。它们都适用于不同的用例,但下一个可能是你最喜欢的 - switchMap()。当我们应用这种展平时,外部流事件的发生(即用户点击)会导致取消对当前内部流正在进行的执行的订阅。当我们只关心 HTTP 查询的最近执行时,此策略很有用。想象一下预先输入搜索的实现。用户输入搜索查询的第一个字母,HTTP 调用开始,然后用户输入查询的接下来的字母。在这种情况下,我们不关心任何先前 HTTP 请求的结果,因此switchMap()非常适合。下面的动画展示了这个展平运算符的行为。您还可以查看我关于这个主题的视频:使用 Angular Material 进行 RxJS 预先输入搜索

下面的源代码片段展示了如何应用switchMap()运算符。

fromEvent(saveBtn, 'click')
  .pipe(switchMap(click => save()))
  .subscribe(result => {
    // result is the result of save()
  });
Enter fullscreen mode Exit fullscreen mode

排气图

本文要探讨的最后一个操作符是exhaustMap()。当我们只想在内部流执行期间忽略exhaustMap()来自外部流的事件时,正确的选择是 。因此,当使用此映射策略时,如果外部事件在内部流完成之前出现,我们根本不执行映射。当我们想要最大限度地减少从浏览器发出的 HTTP 调用数量时,这可能很有用。您可以在下面的动画中注意到,在第一个模拟查询完成之前单击后续按钮不会产生任何效果。

下面的源代码片段展示了如何应用exhaustMap()运算符。

fromEvent(saveBtn, 'click')
  .pipe(exhaustMap(click => save()))
  .subscribe(result => {
    // result is the result of save()
  });
Enter fullscreen mode Exit fullscreen mode

概括

与简单的 Promises 相比,使用 Observable 进行 HTTP 请求乍一看可能有些奇怪。在本文中,我们了解到,在某些情况下,结合使用 Observable 和适当的扁平化策略,即使不是必需的,也能达到预期的效果并避免严重的 bug。

Stackblitz演示:https://stackblitz.com/edit/rxjs-higher-order-streams
GitHub源代码:https://github.com/bartosz-io/rxjs-higher-order-streams

希望您能学到一些新东西,如果您在社交媒体上与朋友们分享这篇文章,我将不胜感激 :) 如果您有任何疑问,请随时在评论区提问——我会一一回复。保重!

如果您喜欢此内容,请在Angular Academy上查看更多内容

鏂囩珷鏉ユ簮锛�https://dev.to/angular/rxjs-switchmap-concatmap-mergemap-exhaustmap-13io
PREV
Angular 中的 HttpContext 到底是什么?
NEXT
预加载所有 Angular Bundle