深入理解 RxJS Subjects 深入理解 RxJS Subjects Subject 是一种特殊的可观察对象,允许将值多播给多个观察者。Subjects 类似于 EventEmitters。

2025-06-07

深入了解 RxJS Subjects

深入了解 RxJS Subjects

Subject 是一种特殊类型的 Observable,允许将值多播给多个 Observers。

主题就像 EventEmitters。

深入了解 RxJS Subjects

本文最初Giancarlo Buomprisco在Bits and Pieces上发表

在本文中,我想探讨 RxJS 对 Subjects 的实现这一主题,Subjects 是一种越来越受到社区关注和喜爱的实用程序。

过去,我曾以多种方式使用过 Subjects,但有时并不完全理解它们的内部结构以及它们与 Observables 的主要区别。

本文将涵盖以下内容:

  • 什么是主题?

  • 多播和单播

  • 其他类型的 Subject:AsyncSubject、ReplaySubject 和 BehaviorSubject

什么是主题?

让我们从一个简单的问题开始:什么是 Subject?
根据 Rx 官网的介绍:

Subject 是一种特殊类型的 Observable,允许将值多播给多个 Observers。

主题就像 EventEmitters。

如果这还不清楚,请坚持下去,在文章结束时,您将更清楚地了解什么是主题以及如何使用它们。

Rx 文档中的定义最初让我印象深刻:事实上,我一直认为 Subjects 纯粹是一种使用流来拉取和推送值的方式。结果发现,即使我每天都用它们大约 5 年,我也没能真正理解它们。

提示:使用Bit轻松跨项目重用 Angular/React/Vue 组件

使用Bit在不同项目之间共享和复用 JS 模块以及 React/Angular/Vue 组件。团队协作使用共享组件,更快地共同构建应用。Bit 承担繁重的工作,让您轻松发布、安装和更新各个组件,无需任何额外开销。点击此处了解更多信息

主题

Subject 是一个内部扩展自 Observable 的类。Subject 既是 Observable,又是 Observer,它允许将值多播给多个 Observer,这与 Observable 不同,Observable 中的每个订阅者都拥有一个独立的 Observable 执行。

这意味着:

  • 你可以订阅一个 Subject 来从其流中提取值

  • 您可以通过调用方法 next() 将值提供给流

  • 你甚至可以将 Subject 作为 Observer 传递给 Observable:如上所述,Subject 也是一个 Observer,因此,它实现了 next、error 和 complete 方法

让我们看一个简单的例子:

    const subject$ = new Subject();

    // Pull values
    subject$.subscribe(
      console.log, 
      null, 
      () => console.log('Complete!')
    );

    // Push values
    subject$.next('Hello World');

    // Use Subject as an Observer
    const numbers$ = of(1, 2, 3);
    numbers$.subscribe(subject$);

    /* Output below */

    // Hello Word
    // 1
    // 2
    // 3
    // Complete!
Enter fullscreen mode Exit fullscreen mode

主体的内部

在内部,每个 Subject 都会维护一个观察者注册表(以数组的形式)。Subject 的内部工作原理如下:

  • 每当有新的观察者订阅时,Subject 都会将观察者存储在观察者数组中

  • 当发出新项目时(即调用了 next() 方法),Subject 会循环遍历观察者,并向每个观察者发出相同的值(多播)。当出现错误或完成时也会发生同样的情况。

  • 当一个 Subject 完成时,所有观察者将自动取消订阅

  • 相反,当 Subject 取消订阅时,订阅仍然有效。观察者数组会被清空,但不会取消订阅。如果你尝试从已取消订阅的 Subject 发出值,实际上会抛出错误。最好的做法是,当你需要销毁 Subject 及其观察者时,完成它。

  • 当其中一个观察者取消订阅时,它将从注册表中删除

多播

将 Subject 作为 Observer 传递可以将 Observable 的行为从单播转换为多播。事实上,使用 Subject 是实现 Observable 多播的唯一方法,这意味着它们将与多个 Observer共享相同的执行。

等等:共享执行到底是什么意思?让我们看两个例子来更好地理解这个概念。

让我们以可观察间隔为例:我们想要创建一个每 1000 毫秒(1 秒)发射一次的可观察对象,并且我们希望与所有订阅者共享执行,无论他们何时订阅。


    const subject$ = new Subject<number>();

    const observer = {
      next: console.log
    };

    const observable$ = interval(1000);

    // subscribe after 1 second
    setTimeout(() => {
      console.log("Subscribing first observer");    
      subject$.subscribe(observer);
    }, 1000);

    // subscribe after 2 seconds
    setTimeout(() => {
      console.log("Subscribing second observer");
      subject$.subscribe(observer);
    }, 2000);

    // subscribe using subject$ as an observer
    observable$.subscribe(subject$);
Enter fullscreen mode Exit fullscreen mode

让我们总结一下上面的代码片段

  • 我们创建一个名为 subject$ 的主体和一个观察者,该观察者在每次发射后简单地记录当前值

  • 我们创建一个每秒发出一次的可观察对象(使用间隔)

  • 我们分别在 1 秒和 2 秒后订阅

  • 最后,我们使用主体作为观察者,并订阅间隔可观察对象

让我们看看输出:

如上图所示,即使第二个 Observable 在 1 秒后订阅,发送给两个观察者的值也完全相同。事实上,它们共享同一个源 Observable。

另一个显示多播实用性的常见示例是订阅执行 HTTP 请求的可观察对象,这种情况在 Angular 等框架中经常发生:通过多播可观察对象,您可以避免执行多个请求并与多个订阅者共享执行,这些订阅者将接收相同的值。

异步主题

我个人认为 AsyncSubject 是最不为人所知的 Subject 类型,仅仅是因为我从来没有真正需要它,或者更有可能的是我不知道我可能需要它。

简而言之,AsyncSubject 将:

  • 仅在完成后发出

  • 仅发出收到的最新值


    const asyncSubject$ = new AsyncSubject();

    asyncSubject$.next(1);
    asyncSubject$.next(2);
    asyncSubject$.next(3);

    asyncSubject$.subscribe(console.log);

    // ... nothing happening!

    asyncSubject$.complete();

    // 3
Enter fullscreen mode Exit fullscreen mode

如您所见,即使我们订阅了,在我们调用方法完成之前也不会发生任何事情。

重播主题

在介绍ReplaySubject之前,我们先来看一个普通Subject的常见使用情况:

  • 我们创建一个主题

  • 在我们的应用程序中的某个地方,我们开始向主题推送值,但还没有订阅者

  • 在某个时刻,第一个观察者订阅

  • 我们期望观察者发出先前通过主体推送的值(全部?还是仅最后一个?)

  • ……什么都没发生!事实上,主体没有记忆


    const subject$ = new Subject();

    // somewhere else in our app
    subject.next(/* value */);

    // somewhere in our app
    subject$.subscribe(/* do something */);

    // nothing happening
Enter fullscreen mode Exit fullscreen mode

这是 ReplaySubject 可以帮助我们的情况之一:事实上,Subject 会记录发出的值,并在订阅时将发出的所有值推送给观察者。

让我们回到上面的问题:ReplaySubject 会重播所有发射还是仅重播最新的发射?

默认情况下,主题会重放所有发出的项目,但我们可以提供一个名为 bufferSize 的参数。此参数定义了 ReplaySubject 应在其内存中保留的发出项目数量:


    const subject$ = new ReplaySubject(1);

    subject$.next(1);
    subject$.next(2);
    subject$.next(3);

    subject$.subscribe(console.log);

    // Output
    // 3
Enter fullscreen mode Exit fullscreen mode

还有第二个参数可以传递给 ReplaySubject,以定义旧值应在内存中存储多长时间。


    const subject$ = new ReplaySubject(100,*250);

    setTimeout(() => subject$.next(1), 50);
    setTimeout(() => subject$.next(2), 100);
    setTimeout(() => subject$.next(3), 150);
    setTimeout(() => subject$.next(4), 200);
    setTimeout(() => subject$.next(5), 250);

    setTimeout(() => {
      subject$.subscribe(v => console.log('SUBCRIPTION A', v));
    }, 200);

    setTimeout(() => {
      subject$.subscribe(v => console.log('SUBCRIPTION B', v));
    }, 400);
Enter fullscreen mode Exit fullscreen mode
  • 我们创建一个 ReplaySubject,其 bufferSize 为 100,windowTime 为 250

  • 我们每 50 毫秒发出 5 个值

  • 我们在 200 毫秒后第一次订阅,在 400 毫秒后第二次订阅

让我们分析一下输出:


    SUBCRIPTION A 1
    SUBCRIPTION A 2
    SUBCRIPTION A 3
    SUBCRIPTION A 4
    SUBCRIPTION A 5
    SUBCRIPTION B 4
    SUBCRIPTION B 5
Enter fullscreen mode Exit fullscreen mode

订阅 A 能够重播所有项目,但订阅 B 只能重播项目 4 和 5,因为它们是在指定的窗口时间内发出的唯一项目。

行为主体

BehaviorSubject 可能是最为人熟知的 Subject 子类。这种 Subject 代表的是“当前值”。

有趣的是,Combine 框架将其命名为 CurrentValueSubject

与 ReplaySubject 类似,每当观察者订阅它时,它也会重播当前值。

为了使用 BehaviorSubject,我们需要在实例化时提供一个强制的初始值。

    const subject$ = new BehaviorSubject(0); // 0 is the initial value

    subject$.next(1);

    setTimeout(() => {
      subject$.subscribe(console.log);
    }, 200);

    // 1
Enter fullscreen mode Exit fullscreen mode

每当发出新值时,BehaviorSubject 都会将该值存储在属性值中,该属性值也可以公开访问。

最后的话

Rx Subject 是相当强大的工具,但正如软件工程中任何强大的工具一样,它们也很容易被滥用。单播和多播的概念是一个显著的区别,在使用 Rx 时需要牢记。

了解 Subjects 内部如何工作不仅有利于避免常见的陷阱和错误,还能了解何时需要它们、何时不需要它们。

如果您需要任何澄清,或者您认为某些内容不清楚或错误,请发表评论!

希望你喜欢这篇文章!如果喜欢,请在MediumTwitter或我的网站上关注我,获取更多关于软件开发、前端、RxJS、Typescript 等的文章!

文章来源:https://dev.to/gc_psk/rxjs-subjects-in-deep-49ni
PREV
面向 Angular 开发者的 Svelte
NEXT
我是如何成为一名没有大学学位却能赚到六位数薪水的软件工程师的