深入了解 RxJS Subjects
深入了解 RxJS Subjects
Subject 是一种特殊类型的 Observable,允许将值多播给多个 Observers。
主题就像 EventEmitters。
深入了解 RxJS Subjects
本文最初由Giancarlo Buomprisco在Bits and Pieces上发表
在本文中,我想探讨 RxJS 对 Subjects 的实现这一主题,Subjects 是一种越来越受到社区关注和喜爱的实用程序。
过去,我曾以多种方式使用过 Subjects,但有时并不完全理解它们的内部结构以及它们与 Observables 的主要区别。
本文将涵盖以下内容:
-
什么是主题?
-
多播和单播
-
其他类型的 Subject:AsyncSubject、ReplaySubject 和 BehaviorSubject
什么是主题?
让我们从一个简单的问题开始:什么是 Subject?
根据 Rx 官网的介绍:
Subject 是一种特殊类型的 Observable,允许将值多播给多个 Observers。
主题就像 EventEmitters。
如果这还不清楚,请坚持下去,在文章结束时,您将更清楚地了解什么是主题以及如何使用它们。
Rx 文档中的定义最初让我印象深刻:事实上,我一直认为 Subjects 纯粹是一种使用流来拉取和推送值的方式。结果发现,即使我每天都用它们大约 5 年,我也没能真正理解它们。
提示:使用Bit轻松跨项目重用 Angular/React/Vue 组件
使用Bit在不同项目之间共享和复用 JS 模块以及 React/Angular/Vue 组件。团队协作使用共享组件,更快地共同构建应用。Bit 承担繁重的工作,让您轻松发布、安装和更新各个组件,无需任何额外开销。点击此处了解更多信息。
主题
Subject 是一个内部扩展自 Observable 的类。Subject 既是 Observable,又是 Observer,它允许将值多播给多个 Observer,这与 Observable 不同,Observable 中的每个订阅者都拥有一个独立的 Observable 执行。
这意味着:
-
你可以订阅一个 Subject 来从其流中提取值
-
您可以通过调用方法 next() 将值提供给流
-
你甚至可以将 Subject 作为 Observer 传递给 Observable:如上所述,Subject 也是一个 Observer,因此,它实现了 next、error 和 complete 方法
让我们看一个简单的例子:
const subject$ = new Subject();
// Pull values
subject$.subscribe(
console.log,
null,
() => console.log('Complete!')
);
// Push values
subject$.next('Hello World');
// Use Subject as an Observer
const numbers$ = of(1, 2, 3);
numbers$.subscribe(subject$);
/* Output below */
// Hello Word
// 1
// 2
// 3
// Complete!
主体的内部
在内部,每个 Subject 都会维护一个观察者注册表(以数组的形式)。Subject 的内部工作原理如下:
-
每当有新的观察者订阅时,Subject 都会将观察者存储在观察者数组中
-
当发出新项目时(即调用了 next() 方法),Subject 会循环遍历观察者,并向每个观察者发出相同的值(多播)。当出现错误或完成时也会发生同样的情况。
-
当一个 Subject 完成时,所有观察者将自动取消订阅
-
相反,当 Subject 取消订阅时,订阅仍然有效。观察者数组会被清空,但不会取消订阅。如果你尝试从已取消订阅的 Subject 发出值,实际上会抛出错误。最好的做法是,当你需要销毁 Subject 及其观察者时,完成它。
-
当其中一个观察者取消订阅时,它将从注册表中删除
多播
将 Subject 作为 Observer 传递可以将 Observable 的行为从单播转换为多播。事实上,使用 Subject 是实现 Observable 多播的唯一方法,这意味着它们将与多个 Observer共享相同的执行。
等等:共享执行到底是什么意思?让我们看两个例子来更好地理解这个概念。
让我们以可观察间隔为例:我们想要创建一个每 1000 毫秒(1 秒)发射一次的可观察对象,并且我们希望与所有订阅者共享执行,无论他们何时订阅。
const subject$ = new Subject<number>();
const observer = {
next: console.log
};
const observable$ = interval(1000);
// subscribe after 1 second
setTimeout(() => {
console.log("Subscribing first observer");
subject$.subscribe(observer);
}, 1000);
// subscribe after 2 seconds
setTimeout(() => {
console.log("Subscribing second observer");
subject$.subscribe(observer);
}, 2000);
// subscribe using subject$ as an observer
observable$.subscribe(subject$);
让我们总结一下上面的代码片段
-
我们创建一个名为 subject$ 的主体和一个观察者,该观察者在每次发射后简单地记录当前值
-
我们创建一个每秒发出一次的可观察对象(使用间隔)
-
我们分别在 1 秒和 2 秒后订阅
-
最后,我们使用主体作为观察者,并订阅间隔可观察对象
让我们看看输出:

如上图所示,即使第二个 Observable 在 1 秒后订阅,发送给两个观察者的值也完全相同。事实上,它们共享同一个源 Observable。
另一个显示多播实用性的常见示例是订阅执行 HTTP 请求的可观察对象,这种情况在 Angular 等框架中经常发生:通过多播可观察对象,您可以避免执行多个请求并与多个订阅者共享执行,这些订阅者将接收相同的值。
异步主题
我个人认为 AsyncSubject 是最不为人所知的 Subject 类型,仅仅是因为我从来没有真正需要它,或者更有可能的是我不知道我可能需要它。
简而言之,AsyncSubject 将:
-
仅在完成后发出
-
仅发出收到的最新值
const asyncSubject$ = new AsyncSubject();
asyncSubject$.next(1);
asyncSubject$.next(2);
asyncSubject$.next(3);
asyncSubject$.subscribe(console.log);
// ... nothing happening!
asyncSubject$.complete();
// 3
如您所见,即使我们订阅了,在我们调用方法完成之前也不会发生任何事情。
重播主题
在介绍ReplaySubject之前,我们先来看一个普通Subject的常见使用情况:
-
我们创建一个主题
-
在我们的应用程序中的某个地方,我们开始向主题推送值,但还没有订阅者
-
在某个时刻,第一个观察者订阅
-
我们期望观察者发出先前通过主体推送的值(全部?还是仅最后一个?)
-
……什么都没发生!事实上,主体没有记忆
const subject$ = new Subject();
// somewhere else in our app
subject.next(/* value */);
// somewhere in our app
subject$.subscribe(/* do something */);
// nothing happening
这是 ReplaySubject 可以帮助我们的情况之一:事实上,Subject 会记录发出的值,并在订阅时将发出的所有值推送给观察者。
让我们回到上面的问题:ReplaySubject 会重播所有发射还是仅重播最新的发射?
默认情况下,主题会重放所有发出的项目,但我们可以提供一个名为 bufferSize 的参数。此参数定义了 ReplaySubject 应在其内存中保留的发出项目数量:
const subject$ = new ReplaySubject(1);
subject$.next(1);
subject$.next(2);
subject$.next(3);
subject$.subscribe(console.log);
// Output
// 3
还有第二个参数可以传递给 ReplaySubject,以定义旧值应在内存中存储多长时间。
const subject$ = new ReplaySubject(100,*250);
setTimeout(() => subject$.next(1), 50);
setTimeout(() => subject$.next(2), 100);
setTimeout(() => subject$.next(3), 150);
setTimeout(() => subject$.next(4), 200);
setTimeout(() => subject$.next(5), 250);
setTimeout(() => {
subject$.subscribe(v => console.log('SUBCRIPTION A', v));
}, 200);
setTimeout(() => {
subject$.subscribe(v => console.log('SUBCRIPTION B', v));
}, 400);
-
我们创建一个 ReplaySubject,其 bufferSize 为 100,windowTime 为 250
-
我们每 50 毫秒发出 5 个值
-
我们在 200 毫秒后第一次订阅,在 400 毫秒后第二次订阅
让我们分析一下输出:
SUBCRIPTION A 1
SUBCRIPTION A 2
SUBCRIPTION A 3
SUBCRIPTION A 4
SUBCRIPTION A 5
SUBCRIPTION B 4
SUBCRIPTION B 5
订阅 A 能够重播所有项目,但订阅 B 只能重播项目 4 和 5,因为它们是在指定的窗口时间内发出的唯一项目。
行为主体
BehaviorSubject 可能是最为人熟知的 Subject 子类。这种 Subject 代表的是“当前值”。
有趣的是,Combine 框架将其命名为 CurrentValueSubject
与 ReplaySubject 类似,每当观察者订阅它时,它也会重播当前值。
为了使用 BehaviorSubject,我们需要在实例化时提供一个强制的初始值。
const subject$ = new BehaviorSubject(0); // 0 is the initial value
subject$.next(1);
setTimeout(() => {
subject$.subscribe(console.log);
}, 200);
// 1
每当发出新值时,BehaviorSubject 都会将该值存储在属性值中,该属性值也可以公开访问。
最后的话
Rx Subject 是相当强大的工具,但正如软件工程中任何强大的工具一样,它们也很容易被滥用。单播和多播的概念是一个显著的区别,在使用 Rx 时需要牢记。
了解 Subjects 内部如何工作不仅有利于避免常见的陷阱和错误,还能了解何时需要它们、何时不需要它们。
如果您需要任何澄清,或者您认为某些内容不清楚或错误,请发表评论!
希望你喜欢这篇文章!如果喜欢,请在Medium、Twitter或我的网站上关注我,获取更多关于软件开发、前端、RxJS、Typescript 等的文章!
文章来源:https://dev.to/gc_psk/rxjs-subjects-in-deep-49ni