使用 Angular 进行 RxJS 订阅管理

2025-06-09

使用 Angular 进行 RxJS 订阅管理

可观察对象的订阅是使用 RxJS 的基础。每次订阅时,我们都会创建一个对象Subscription,并将其保存在内存中。如果不处理,订阅将保留在内存中,并可能导致内存泄漏

在本文中,我将介绍管理订阅的各种方法以及如何选择合适的方法。RxJS 订阅管理是开发人员常犯的错误之一,部分原因是 RxJS 的学习曲线,部分原因是订阅机制的不透明性,但这对于有效使用 RxJS 至关重要。

简而言之,管理 RxJS 订阅就是知道何时取消订阅。每当有订阅(调用.subscribe())时,开发人员都应该知道或意识到它何时会被取消订阅,无论是在第 n 次发送之后,还是在组件被销毁时。

我将介绍 6 种管理订阅的方法以及它们的使用时机,不包括 RxJS 本身之外的任何外部库/插件。它们分别是async管道、first运算符、take运算符、takeWhile运算符、takeUntil运算符,以及最后的.unsubscribe()调用。具体使用哪种方法取决于具体情况,例如,管道函数(即副作用)是否复杂?订阅应该保持活动状态的频率或时间?


异步管道

我们始终应该尝试的第一个方法是asyncpipe 方法。有了asyncpipe,我们无需手动处理取消订阅,订阅和取消订阅都会在 中为您处理Pipe。组件销毁后,它会立即取消对可观察对象的订阅。在内部,它还会为您处理变更检测。有了asyncpipe,您的 .ts 文件中的代码会少很多。代码越少,bug 就越少。

async pipe used in app.component.html
<p>{{ data$ | async }}</p>
Enter fullscreen mode Exit fullscreen mode

然而,pipe 方法有一个需要注意的地方async:每次 Angular 中的 UI 重新渲染时,任何async通过管道传输的可观察对象都会被触发,从而导致其间的任何函数、操作或副作用都会被触发,触发次数与 UI 重新渲染次数相同。如果在管道之间执行了开销高的操作,则会占用大量资源。请记住这一点,并将开销大的操作移至不同的可观察对象流,并在组件的类 (.ts) 文件中处理它们。

另外,async管道实际上仅适用于需要在模板中打印数据的情况。不过,在情况允许的情况下,它应该是我们管理可观察对象订阅的首选方法。

参考:AsyncPipe


第一操作员

尽管名称如此,但first运算符是我们考虑的第二种方法。使用first运算符,只要有一个数据发出,你的可观察对象订阅就会被取消订阅。我们可以传入一个函数作为谓词/验证器,以确保通过此运算符发出的数据是我们想要的值。当我们非常确定只需要从订阅中发出一个数据时,就可以使用此运算符。

const data$ = from([0, 1, 2, 3]);

// without requirement
// unsubscribed with one emission
data$.pipe(
  first()
).subscribe();

// output: 0

// with guard / validator function
// ensures only truthy value can pass through
// will only unsubscribe after one truthy value
data$.pipe(
  first(value => !!value)
).subscribe();

// output: 1
Enter fullscreen mode Exit fullscreen mode

参考:第一


take 运算符

与运算符类似firsttake运算符接受有限数量的发射,不同之处在于它可以接收多个发射。另一个区别是,first如果流在发射值之前完成,则会发出错误,而运算符则take不会。当您知道只需要有限数量的发射时,请使用此运算符。您也可以考虑添加一个filter运算符来防止发射,以确保您收到的第 n 个发射对您有价值。

const data$ = of(0, 1, 2, 3);

// takes 3 truthy value
data$.pipe(
  filter(x => !!x)
  take(3)
).subscribe();

// output: 1, 2, 3
Enter fullscreen mode Exit fullscreen mode

參考文獻:


takeWhile 运算符

takeWhile当条件成立时,运算符将保持订阅有效。该运算符将接受谓词/验证函数来判断条件的真假。

const data$ = of(1, 1, 1, 2, 4, 8);

// take while value is less than 4
data$.pipe(
  takeWhile(value => value < 4)
).subscribe();

// output: 1, 1, 1, 2
Enter fullscreen mode Exit fullscreen mode

这个操作符并不常见,但在某些情况下需要它,例如,我们希望订阅在满足某个条件后立即停止。另一个例子是,我们希望在鼠标悬停在某个元素上时保持订阅状态,并在鼠标离开时立即取消订阅。

参考:takeWhile


takeUntil 运算符

takeUntil操作符接受一个可观察对象作为其通知器,该通知器将指示何时结束订阅。当通知器传入takeUntil操作符时,它将在内部订阅该可观察对象,并且一旦通知器发出一个事件,它将同时取消订阅源和通知器可观察对象。请注意,通知器可观察对象是在takeUntil内部取消订阅的,因此,如果没有其他对象订阅该通知器,则无需取消订阅或完成通知器可观察对象。

这个操作符可能是我们最常用的操作符。如果上面提到的操作符都不适合您的情况,那么这个takeWhile操作符很可能就是最适合您的。它最常用于保持订阅有效,直到事件发生,例如组件被销毁。

takeUntil to end subscription when component is destroyed
class AppComponent implements OnInit, OnDestroy {
  private readonly destroyed$ = new Subject<void>();

  ngOnInit() {
    const interval$ = interval(1000);

    interval$.pipe(
      tap(val => console.log(val)),
      takeUntil(this.destroyed$)
    ).subscribe();
  }

  ngOnDestroy() {
    this.destroyed$.next();
  }
}
Enter fullscreen mode Exit fullscreen mode

在使用takeUntil结束订阅时,请确保将其放置在操作符链的最后,以确保其覆盖中间的所有流。这将防止订阅泄漏到takeUntil操作符之后的流。

takeUntil subscription leak
const streamA$ = interval(1000);
const streamB$ = interval(50);
const notifier$ = new Subject();

streamA$.pipe(
  takeUntil(notifier$),
  switchMap(() => streamB$)
).subscribe();

notifier$.next();
Enter fullscreen mode Exit fullscreen mode

在上面的代码片段中,streamA$将在发出后结束notifier$,但streamB$不会结束,它的订阅将保持活动状态,这就是takeUntil泄漏。

参考:takeUntil


取消订阅()

最后,我们只需调用.unsubscribe()即可Subscription结束订阅。您需要先将订阅赋值给一个变量或类属性,然后在.unsubscribe()需要结束订阅时调用。

single subscription .unsubscribe()
const interval$ = interval(1000);
const subscription = interval$.subscribe();

subscription.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

然而,由于我们必须将每个订阅赋值给一个变量,因此需要做更多的工作/代码,而且与上面提到的几种方法相比,它需要非常手动和命令式地完成。尤其是在有多个订阅的情况下。

batch subscriptions .unsubscribe()
const subsciptions = [];
const interval$ = interval(1000);
const subscriptionA = interval$.subscribe();
const subscriptionB = interval$.subscribe();

subscriptions.push(subscriptionA);
subscriptions.push(subscriptionB);

subscriptions.forEach(subscription => subscription.unsubscribe());
Enter fullscreen mode Exit fullscreen mode

参考:订阅


结论

至此,我介绍了 6 种管理 RxJS 订阅的方法,并按照决策层级进行了排序。学习这些方法,并根据具体情况自行决定哪种方法最适合解决你的问题。

值得一提的是,有一个名为UntilDestroy的库可以帮助你在组件销毁时取消订阅可观察对象。这takeUntil就像你写的代码更少了一样。

以上就是我要分享的全部内容。祝您编程愉快!

鏂囩珷鏉ユ簮锛�https://dev.to/j3nnning/rxjs-subscription-management-with-angular-54ek
PREV
2023 年 10 大最佳开发者礼品
NEXT
使用自定义非 root 用户运行 Docker 容器:同步主机和容器权限