RxJS 模式:效率和性能

2025-06-11

RxJS 模式:效率和性能

您可以利用所有 RxJS 运算符和技术来避免不必要的计算并使您的代码更简洁、更快速

RxJS 是一个库,它有助于更​​轻松地编写异步或基于回调的代码,如其主页上所述。

但是您是否知道,RxJS 还可以借助其强大的运算符帮助您的应用程序更加高效、性能更好?

在本文中,我想分享最有用的运算符和技术,以帮助避免不必要的重复计算,从而使您的应用程序更快、更高效,而我们所需要的只是一些 RxJS 魔法。

注意:下面的例子是简化的,可能不是完整的片段。

提示:使用正确的代码共享工具来优化团队合作

使用Bit共享、安装和协作单个 React 组件。无需再浪费时间配置软件包、管理多个仓库或维护繁琐的单一仓库。

带有 Bit 的组件:团队之间轻松共享项目
Bit 组件:轻松跨团队共享 团队
共享可重用代码组件 · Bit
*轻松在项目和应用程序之间共享可重用组件,加快团队构建速度。协作开发……*bit.dev

过滤

过滤操作符允许我们从流中过滤掉那些我们想要忽略的事件,从而避免将它们发送给可观察对象的订阅者。如果我们在管道中足够快地过滤事件,就能避免将它们传递给其他操作符以及订阅回调。

当然,如果管道进行大量计算或 HTTP 请求,这一点尤其重要。

这些运算符主要用于逻辑而不是性能原因,尽管它们仍然有助于避免计算不必要的任务。

编写流时你可以问自己的问题是:我应该传递所有项目还是可以在管道中的某个点跳过它们?

筛选

避免不必要计算(除其他外)的最简单的操作符是过滤器。

如果您已经熟悉 Array.prototype.filter 方法,那么您可能已经知道它的用法:我们将谓词作为参数传递给运算符,如果它对正在流式传输的事件返回 true,则该事件将通过管道传递,否则,它将被丢弃。

    const numbers$ = of(1, 2, 3, 4, 5);
    const predicate = (n) => n <= 2;

    numbers$
        .pipe(
            filter(predicate)
         )
        .subscribe(console.log);
    // will log 1,2
Enter fullscreen mode Exit fullscreen mode

distinctUntilChanged

另一种类型的过滤运算符是 distinctUntilChanged。

该运算符会将当前值与源 Observable 的先前值进行比较,如果两者不同,则将该项目传递出去。简而言之,它的工作原理类似于过滤器,但会比较先前值和当前值。

为了解释此运算符的工作原理,我们可以使用它来非常常见的场景,即从文本输入接收输入并跳过所有值与前一个事件相同的事件。

    const textChanges$ = fromEvent(textElement, 'input');

    textChanges$
        .pipe(
            filter(Boolean),
            distinctUntilChanged()
        )
        .subscribe(console.log);
Enter fullscreen mode Exit fullscreen mode

但这仅适用于原始值。如果您想传递更强大的相等谓词,则可以提供一个函数并手动比较前一个值和当前值。

    const stream$ = /* some Rx stream with objects */
    const isEqual = require('lodash/isEqual');

    changes$
        .pipe(
            distinctUntilChanged(isEqual)
        )
        .subscribe(/**/);
Enter fullscreen mode Exit fullscreen mode

防抖和节流

去抖动和节流是用于在单次发射中批量处理时间窗口内发射的事件的技术。

尽管这两种技术以不同的方式实现了类似的目的,但它们有时会互换使用和引用。

油门时间

操作符 throttleTime 用于仅发出以毫秒为单位指定的时间窗口内收到的第一个项目,然后再次等待整个时间窗口的时间,然后才能发出后续事件。

    const textChanges$ = fromEvent(textElement, 'input');

    textChanges$
        .pipe(
            filter(Boolean),
            distinctUntilChanged(),
            throttleTime(1000)
        )
        .subscribe(console.log);
Enter fullscreen mode Exit fullscreen mode

让我们用一个简单的视觉表示来解释这一点:

time in ms : 0---500---1000---1500---2000
events     : _a_____b__________c__d___e__
Enter fullscreen mode Exit fullscreen mode

触发了哪些事件?a 和 c!第一个事件 a 在时间范围 0 到 1000 内收集,第二个事件 b 被跳过,因为它在窗口内触发。然后触发了 c,d 和 e 被过滤掉了。

去抖动时间

与 throttleTime 相反,debounceTime 操作符用于仅发送在指定的时间窗口(以毫秒为单位)内收到的最新数据。与节流类似,debounce 操作符也会等待时间窗口结束后才能发送新事件。

    const textChanges$ = fromEvent(textElement, 'input');

    textChanges$
        .pipe(
            filter(Boolean),
            distinctUntilChanged(),
            debounceTime(1000)
        )
        .subscribe(console.log);
Enter fullscreen mode Exit fullscreen mode

让我们重复一下所使用的相同表示throttleTime来理解差异:

    time in ms : 0---500---1000---1500---2000
    events     : _a_____b__________c__d___e__
Enter fullscreen mode Exit fullscreen mode

在这种情况下,只有 b 和 e 发射。

节流和去抖有何帮助?

这些操作符用于延迟和批量执行一定时间范围内重复的事件。

它们在各种情况下都能提供帮助,我们可以避免执行无用的命令或昂贵的操作,例如 HTTP 请求。

想象一下,用户输入的每次更改都会触发对服务器的请求:如果我们不进行去抖动,我们不仅会给服务带来垃圾信息,还会降低用户体验。根据我的经验,每 250 到 500 毫秒进行一次去抖动是确保流畅用户体验的最佳时间。

取消

取消订阅是一项重要但经常被忽视的任务,我在审查 PR 时经常看到这一点。

取消不仅对于减少无用的计算和避免内存泄漏很重要,而且更重要的是,可以防止应用程序中可能出现的错误。

取消订阅

取消订阅的最简单、最必要的方式就是简单地调用每个订阅对象都应该实现的取消订阅方法。

    const inputs$ = fromEvent(element, 'input');
    const subscription = inputs.subscribe(/*some work*/);

    subscription.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

虽然这是一个非常有效且可行的取消订阅流的示例,但它通常不被认为是最佳实践。事实上,Rx 提供了强大的操作符,可以帮助我们实现同样的效果,但方式更具声明性和响应性。

takeUntil

我最喜欢的取消订阅流的方式是 takeUntil。这个操作符允许你在输入流触发事件时取消订阅该流。哦,这看起来太复杂了,但实际上并非如此。

让我们看一个例子:

  • 我们有两个可观察对象,分别以 1000 毫秒(1 秒)和 100 毫秒的间隔发出

  • 每 100 毫秒发出一次消息的流将在另一个流发出消息时取消订阅,这将每 1 秒发生一次

    // emit every 1 second
    const slow$ = interval(1000);

    // emit every 100 ms
    const fast$ = interval(100).pipe(
        takeUntil(slow$)
    );

    fast$.subscribe({
      next(n) {
        console.log(n);
      },
      complete() {
        console.log('I am unsubscribed!');
      }
    });

    slow$.subscribe();
Enter fullscreen mode Exit fullscreen mode

这将产生以下输出:

0
1
2
3
4
5
6
7
8
I am unsubscribed!
Enter fullscreen mode Exit fullscreen mode

takeWhile

这个运算符对于根据流自身的值取消订阅非常有用。我需要用到这个运算符的场景之一是,当某些计时器达到一定迭代次数时,就停止它们。例如,倒计时器。

在下面的例子中,我想在计时器迭代 5 次后停止它。—

  • takeWhile 运算符接受一个谓词函数,其参数是流的当前值

  • 如果谓词为真,它将继续发出值;如果谓词为假,则它将取消订阅流

    const stream$ = interval(1000).pipe(
      takeWhile(n => n < 5)
    );

    stream$.subscribe({
      next(n) {
        console.log(n)
      },
      complete() {
        console.log('I am unsubscribed!')
      }
    });
Enter fullscreen mode Exit fullscreen mode

这将产生以下输出:

0
1
2
3
4
I am unsubscribed!
Enter fullscreen mode Exit fullscreen mode

切换映射

操作符 switchMap 通常用于展平可观察流。

您可能知道,它具有一种特殊的行为:在每次发射时,它不会维护多个内部可观察对象,而是会完成前一个内部可观察对象,然后发射新的内部可观察对象。

也就是说,如果我们有一个正在执行的 HTTP 请求,当另一个请求发出时,它将被取消。当然,根据你使用的 Observable 类型,会有不同的拆卸效果。

在下面的示例中,我创建了一个简单的代码片段,当用户在表单中输入值时,它会发出请求。该请求将查询 Github 的 API 以获取存储库并将其呈现在屏幕上。

在文章的最后,我们会重新回顾这个例子,添加强大的缓存机制,以及代码的链接。

批处理

在您在非常短的时间内重复执行非常昂贵的操作的情况下,例如根据流的更新重新渲染 DOM 树,批处理可以帮助收集更新并立即渲染它们。

我第一次使用这种技术是在使用 Angular.js 时:每次从后端进行更新时,摘要循环都会被调用很多次,以至于占用了大量的时钟时间。

就在那时,我想:为什么不把更新批量放到一个数组里,然后每 1 或 2 秒更新一次呢?为了做到这一点,我们可以使用 buffer 或 bufferTime 操作符(或者,缓冲家族中的其他操作符)。

缓冲时间

运算符 bufferTime 是运算符 buffer 的快捷方式,它接受以毫秒为单位的时间量,并将每 n 毫秒对数组中的流进行批处理。

例如,在下面的示例中,我们模拟了一个每 500 毫秒发出一次的数据流。缓冲区设置为 2 秒。这意味着,我们将在 2 秒的窗口内收集 4 次更新。

    this.stream$ = interval(500);

    this.data$ = this.stream$.pipe(
      bufferTime(2000),
      filter((items) => items.length > 0),
      scan((acc, items) => {
        return [...acc, ...items];
      }, [])
    );

    this.data$.subscribe((items) => {
      /* expensive operation here */

      console.log('re-render!');
    });
Enter fullscreen mode Exit fullscreen mode

我们来总结一下以上内容:

  • 我们有一个发出带有数据的事件的流(在示例中,它只是一个带有数字的间隔)

  • 我们每 2 秒使用 bufferTime(2000) 对事件进行批处理

  • 我们过滤所有为空的事件;在我们的例子中,这没什么意义,因为我们总是有值,但这可能是你在实际应用程序中想要做的事情,因为有时你会收到不需要重新渲染的数据

  • 我们使用与 Array.prototype.reduce 类似的 scan 运算符。我们收集事件并将它们分组到一个数组中——如果我们要处理一个大型列表,我们需要重新渲染。

使用此技术时要注意两点:

  • 测试性能!缓冲将继续在后台工作,收集事件:仅当渲染或其他昂贵的操作是应用程序的真正瓶颈时才使用此功能

  • 由于上述原因,请记得在不需要时取消订阅该流

缓存

每个应用程序都需要一定程度的缓存来提高效率和性能。缓存最重要的应用场景之一是 HTTP 请求,我们可以利用 RxJS 轻松实现这一点。

例如,我们可能不仅想返回缓存的响应,而且还想停止任何返回相同值的正在进行的请求,当然假设这些请求是幂等的。

在下面的示例中,我们将使用缓存扩展存储库搜索应用程序。缓存的工作方式如下:

  • 我们创建一个 Map 来存储请求的名称和结果

  • 当我们即将发出请求时,我们会检查是否有缓存结果。如果没有,则继续执行请求

  • 当请求执行时,我们将可观察对象本身放入缓存中,并使用操作符 shareReplay(1) 将最新的 1 个发射存储在内存中。当可观察对象(在缓存中)再次被订阅时,它将返回其结果,而不是再次发出请求。

上面的示例经过简化,没有考虑错误处理等情况。因此,如果您以此为灵感编写代码,可能需要处理更多情况。

如果您想了解其实际效果,请访问以下Stackblitz 链接

总结

RxJS 是一款极其强大的工具。虽然学习难度可能比较高,但学会使用它绝对能丰富你的个人开发技能库。

它不仅可以轻松地使您的代码更具声明性和可读性,而且只需在流中添加几行代码就可以帮助我们提高应用程序的效率和性能。

上述技术是使用 RxJS 运算符使您的代码高效且性能卓越的详尽技术列表,但远非完整。

如果您需要任何澄清,或者您认为某些内容不清楚或有错误,请留言!我们随时欢迎您的反馈。

希望你喜欢这篇文章!如果喜欢,请在 MediumTwitter或我的网站上关注我,获取更多关于软件开发、前端、RxJS、Typescript 等的文章!

鏂囩珷鏉ユ簮锛�https://dev.to/gc_psk/rxjs-patterns-efficiency-and-performance-2i6m
PREV
如果你正在读这篇文章,请为 Forem 做贡献
NEXT
Typescript 🟦 CRUD API:Next.js、Tailwind、tRPC、Prisma Postgres、Docker