响应式编程🌫️ - 使用 RxJS 揭秘
如果您正在寻找 RxJS 快速入门,那么本文不适合您!
在这里,我将讨论响应式编程,目的是使用 RxJS 作为示例,阐明其不合理的虚幻世界。
我将讲解响应式编程的核心概念,并将它们与 RxJS 联系起来,以及它们在实践中的运作方式。希望读完本文后,你能对 RxJS 有更深入的理解,并能够快速掌握任何 Rx 实现并开始编写代码。
陈述
RxJS 是一个使用可观察流进行异步编程的 API
。
要理解这意味着什么,我们需要定义异步编程和可观察流的含义。
最好的起点只能是反应式编程本身!
响应式编程
反应式编程(不要与函数式反应式编程混淆!)是异步编程的一个子集,是一种利用新信息的可用性来推动逻辑向前发展的范例,而不是通过执行线程来驱动控制流。
异步编程是一种并行编程方法,其中工作单元独立于主应用程序线程运行。通常,这是通过消息系统实现的,在消息系统中,竞争共享资源的执行线程无需通过阻塞等待(阻止执行线程在当前工作完成之前执行其他工作),因此可以在资源被占用时执行其他有用的工作。这个概念对于响应式编程至关重要,因为它允许编写非阻塞代码。以下是该过程的可视化:
同步阻塞通信(左)资源效率低下,容易出现瓶颈。反应式方法(右)降低了风险,节省了宝贵的资源,并且对硬件/基础设施的需求更少。
消息与事件
响应式编程通常是事件驱动的。事件只是无向消息。从本质上讲,它们(无论出于何种目的)都是事件的扩展。
反应式编程库的应用程序接口(API)通常是:
- 基于回调:匿名、有副作用的回调附加到事件源,并在事件通过数据流链时被调用。
- 声明式:通过功能组合,通常使用成熟的组合器,如 map、filter、fold 等。
反应系统
反应式宣言将反应式系统定义为:
- 响应性:响应系统专注于提供快速且一致的响应时间。
- 弹性:弹性系统能够在问题发生时进行处理,并在出现故障时保持响应。
- 弹性:弹性系统在变化的工作负载下保持响应,因此具有扩展能力。
- 消息驱动:消息驱动系统依靠异步消息传递来建立,以确保更改在组件之间无中断地传播。
反应式编程和反应式系统
这两者有什么关系?总结一下:
- 响应式编程是一种管理系统组件内部逻辑和数据流转换的技术。它能够提高代码的清晰度、性能和资源效率。
- 反应式系统是一套架构原则。它强调分布式通信,并为我们提供了解决分布式系统弹性和弹性的工具。
反应式编程应该作为构建反应式系统的工具之一。
在实践中定义范式
好吧,那么,响应式编程到底是什么?市面上有很多定义……有些我觉得连作者自己都搞不懂。用@andrestaltz的话说——“少废话”
反应式编程是使用异步数据流进行的编程。
美观、简洁,最重要的是易于解释!事实上,这个定义几乎和我之前关于 RxJS 的表述相同。这是因为 RxJS 属于响应式编程范式。
从现在开始,我们假设使用流进行响应式编程。在范式中还可以考虑其他类型的实现,例如:Promises/Futures和Dataflow 变量。
现在,正如承诺的那样,我将向你们展示“异步数据流”的含义。
流
响应式编程的核心思想是,一切(大多数情况下)都可以是流。流成本低廉且无处不在。
流是按时间顺序排列的持续事件序列。它只能发出三种事件:数据类型值、错误或终止信号。
记住这个定义很重要,因为无论范式如何实现,它都保持不变。
我喜欢通过想象一个带有关闭机制的水管来思考溪流,其中每个水分子(或一组水分子)都是一个发射值。
关闭机制可以通过手动旋转水龙头来触发,这代表终止信号;或者,如果管道无法正常工作,则隐式地触发,代表错误。关闭的管道不再能流出水,我们称之为已完成的流。
现在,让我们集中讨论定义的第一句话:“流是按时间顺序排列的一系列正在发生的事件。”
换句话说,随着时间的推移(程序执行),水滴(数据)会从管道(流)中被推出。我们如何捕捉这些水滴并对其采取行动?
在大多数响应式编程实现中,我们仅异步捕获这些发出的事件,通过定义被调用的函数并将三个适当的输出之一作为参数传递:
- 值发射时:每次值被推送到流中时,都会在此发射并捕获。可多次发生。
- 错误触发:当流发生错误时,错误信息将在此捕获并终止流。仅发生一次。
- 终止时:当流终止时,它将被捕获。仅发生一次。
以上就是捕获部分的内容。现在该介绍如何操作流本身了。我们通过Operators来实现。
运算符
运算符提供了一种通过转换来操作流的方法。在我们的上下文中,转换只是一个f
将一个流映射到另一个流的f: S1 → S2
函数,我们称之为运算符。
为了更直观地理解这一点,想象一下在水流的管道中放置一个或多个设备。这些设备可能装有过滤器,或者可以改变水的成分(或其他转化),从而将我们的水流转化为新的水流。
在上图中,我们初始的“未净化水”类型的流被转换为“净化水”类型的流,从而将管道末端观察到的数据从其原始形式进行了转换。
为了解释运算符及其对实际数据流的影响,我们必须深入研究 Marble Diagrams 的世界。
大理石图
在解释大理石图之前,我们需要稍微改进一下我们的术语。
重新定义一些术语
现在,由于我们将在下一章讨论 ReactiveX,所以是时候介绍一些必要的术语了。别担心,目前我只会对一些与我已经讲过的概念相对应的术语给出抽象的定义。下面的图表与之前相同,但包含了新的术语。
对于操作员图,
这些术语的简单定义是:
- 流 -> 可观察:表示随时间变化的值流的结构。
- 点击 -> 订阅者:有时也称为消费者,在可观察的结构上调用订阅过程的代码。
- 转动水龙头->订阅:为观察者打开流的方法。
- 关闭水龙头 -> 完成:将流标记为已完成的操作,意味着它已终止。
- 存储桶 -> 观察者:捕获我们所推送的值的结构,使我们能够对其采取行动。
- 设备 -> 操作员:转换流的功能。
我们稍后会讨论更精确的定义,因为它们几乎是阅读任何 RX 文档所必需的,而且不会让人头疼。所以,如果你还不太明白这些定义的含义,也不用担心。
但是,从现在开始我们将使用这个新术语,因此我建议您记住“映射”这个术语。
大理石图 - 可观察对象
好的,现在是绘制真正的大理石图的时间了!
学习响应式编程可能是一项艰巨的任务,因此 Rx 团队提出了弹珠图的概念,以帮助直观地理解可观察对象及其操作符。这些图非常直观,通常出现在任何 Rx 操作符文档中。它们让您无需阅读太多其他内容就能轻松理解操作符。这比厚厚的、充满术语的文字更实用!我会尽力解释如何阅读它们:
好吧……我的错,哈哈,抱歉!我们一步一步来吧。
弹珠图描述的是可观察量。可观察量是随时间变化的值流。所以,我们需要一个时间轴!
现在我们有了时间轴,我们需要表示可观察对象的输出。回想一下我们之前的定义,可观察对象只能输出一个值、一个终止信号或一个错误。
让我们从简单的开始,终止信号:
以类似的方式,我们有错误输出:
最后,让我们表示一下我们发出的值:
时间轴上可以有多个值,只要它们后面没有终止或错误输出,因为这些值将取消对可观察对象的订阅。
完成了,是不是很简单?接下来进入下一部分:弹珠图里的运算符!
弹珠图——运算符
如前所述,操作符是用来转换可观察对象的函数。这意味着它们以一个或多个可观察对象作为输入,并输出一个新的可观察对象。我们可以用弹珠图来表示它们,如下所示:
中间的代码块是我们的运算符函数,它接受一个可观察变量并返回另一个。因此,我们的函数通过对输入的可观察变量取模 2 来过滤输入的可观察变量,判断被推送的值是否为偶数,如果是,则允许该值通过,本质上就是对流进行过滤。
如前所述,运算符可以有多个可观察对象作为输入,例如以下运算符的情况switchMap
该switchMap
运算符非常流行,并且有很多实际应用。它通常用于在输入流之间实现丢弃操作,这在实际应用中可以节省大量的麻烦和计算量。
总而言之,每次Input Observable 1
发出一个值时,Input Observable 2
都会发出其所有值,除非在 完成Input Observable 1
之前 发出一个新值Input Observable 2
。如果你查看输出的可观察对象,你会注意到只有两个 30。这是因为在 发出值 5Input Observable 2
之前 无法完成。Input Observable 1
你很容易证实这一点,因为 3 和 5 之间的间隔远小于 的轴的大小Input Observable 2
,这表明只有时间发出前两个值。
实践 - RxJS
RxJS 概述
RxJS 是一个扩展ReactiveX 的库,用于通过 JavaScript 使用可观察序列编写异步和基于事件的程序。它提供了一种核心类型 Observable、一些附属类型(Observer、Schedulers、Subjects)以及一些操作符(map、filter、reduce、every 等),从而可以轻松操作可观察流,并显著减少解决异步问题所需的代码量。
优点与缺点
优势
- 增长非常迅速。
- 仅 RxJs 每周的下载量就有 2500 万次。
- 提供非常高质量的异步 API。
- 轻量级且内存优化。
- 轻松处理错误。
- 使大多数应用程序中的异步编程更快。
缺点
- 学习曲线相对较陡。
- 意味着一种函数式编程风格(数据不变性)。
- 测试/调试可以是一个学习过程。
RxJS 词汇表
在 RxJS 中,一些可以说已建立的定义是:
实体
- 可观察的:表示可调用的未来值或事件集合的想法。
- 观察者 (Observer):是一组回调函数的集合,它知道如何监听由可观察对象 (Observable) 传递的值。
- 订阅:表示可观察对象的执行,主要用于取消执行。
- 运算符:是纯函数,支持以函数式编程风格使用诸如 map、filter、concat、reduce 等操作来处理集合。
- Subject:相当于EventEmitter,是将值或事件多播给多个Observers的唯一方式。
- 调度程序:是控制并发的集中调度程序,允许我们在 setTimeout 或 requestAnimationFrame 或其他程序上进行计算时进行协调。
- 生产者:订阅可观察对象的代码。它会接收下一个值、错误或完成的通知。
- 消费者:任何系统或事物,都是从可观察的订阅中推送给消费者的价值来源。
概念
- 单播:指一个生产者只被一个消费者观察到的行为。当一个可观察对象只连接一个生产者和一个消费者时,它就是“单播”的。单播并不一定意味着“冷”。
- 多播:一个生产者被多个消费者观察的行为。
- 冷:如果一个可观察对象在订阅期间为每个新订阅创建一个新的生产者,则该可观察对象为“冷”。因此,“冷”可观察对象始终是单播的,即一个生产者被一个消费者观察到。冷可观察对象可以变为热可观察对象,但反之则不行。
- 热:当可观察对象的生产者是在订阅操作的上下文之外创建的,则该可观察对象是“热”的。这意味着“热”可观察对象几乎总是多播的。如果“热”可观察对象被设计为一次只允许一个订阅,那么从技术上讲,它仍然是单播的。然而,RxJS 中没有直接的机制来实现这一点,这种情况不太可能发生。为了便于讨论,所有“热”可观察对象都可以假设为多播的。热可观察对象不能设置为冷的。
- 推送:Observable 是一种基于推送的类型。这意味着,消费者无需调用函数或其他操作来获取值,而是在生产者生成值后,消费者会通过已注册的 next 处理程序立即接收值。
- 拉取:拉取系统与推取系统相反。在拉取系统或系统中,消费者必须手动请求生产者生成的每个值,这很可能是在生产者实际完成操作很久之后。这类系统的例子包括函数和迭代器。
可观察对象和订阅
现在我们应该同意,Observable 仅仅是惰性推送多个值的结构体。订阅则代表着可支配资源(通常是 Observable 的执行)的结果结构体。
以下是我们在 RxJS 中对它们进行编码的方式:
import { Observable } from 'rxjs';
/* Instantiate an observable */
const observable = new Observable(subscriber => {
subscriber.next(1); // pushes a value
subscriber.next(2); // pushes another value synchronously
setTimeout(() => {
subscriber.next(3); // pushes last value after a wait of 1s
subscriber.complete(); // terminates observable stream
}, 1000);
});
/* Subscribing to an observable */
console.log('just before subscribe');
const subscription = observable.subscribe({
// The three possible output captures:
next(x) { console.log('got value ' + x); },
error(err) { console.error('something wrong occurred: ' + err); },
complete() { console.log('done'); }
}); // creates subscription object
console.log('just after subscribe');
/* Unsubscribing to an observable using subscription */
setTimeout(() => {
subscription.unsubscribe();
}, 500);
// Logs:
// just before subscribe
// got value 1
// got value 2
// just after subscribe
请注意,我们永远不会看到记录的值 3,因为我们在通过传递给的闭包函数发出它之前取消了订阅setTimeout
。
然而,这并不意味着该值没有被发出,它确实发出了,只是因为我们停止了订阅,所以看不到它。流并没有通过取消订阅的操作终止。
热与冷的可观察对象
当某些代码调用subscribe()函数时,冷可观察对象就会开始生成数据。
一个冷的可观察对象:
import { Observable } from "rxjs";
// Creating a cold observable
const observable = Observable.create((observer) => {
observer.next(Math.random()); // We explicitly push the value to the stream
});
// Subscription 1
observable.subscribe((data) => {
console.log(data); // 0.24957144215097515 (random number)
});
// Subscription 2
observable.subscribe((data) => {
console.log(data); // 0.004617340049055896 (random number)
});
即使没有订阅者对数据感兴趣,热可观察对象也会产生数据。
一个热门的可观察对象:
import { Observable } from "rxjs";
// Coming from an event which is constantly emmit values
const observable = Observable.fromEvent(document, 'click');
// Subscription 1
observable.subscribe((event) => {
console.log(event.clientX); // x position of click
});
// Subscription 2
observable.subscribe((event) => {
console.log(event.clientY); // y position of click
});
Promises 与 Observables
主要区别在于:
- Promise 是渴望的。Observable 是惰性的。
- Promise 是单值输出。Observable 是多值流。
- Promise 没有取消或操作 API。Observable 有。
RxJS 与 Promises 的 stackblitz 示例:https://stackblitz.com/edit/classicjs-vs-rxjs
可观察对象可以是承诺
虽然可观察对象不是Promise/A+规范的扩展,但 RxJS 仍然提供了将可观察对象转换为真正的 Promise 的方法。示例如下:
import { Observable } from "rxjs";
// Return a basic observable
const simpleObservable = val => Observable.of(val).delay(5000);
// Convert basic observable to promise
const example = sample('First Example')
.toPromise() // Now its a promise
.then(result => {
console.log('From Promise:', result); // After 500ms, output 'First Example'
});
使用 RxJS 的toPromise
方法,任何可观察对象都可以转换为 Promise。需要注意的是,由于它返回的是真正的 JS Promise,所以toPromise
它不是一个 pipable 运算符,因为它不返回可观察对象。
观察者
实际上,观察者 (Observer) 是可观察对象 (Observable) 传递的值的消费者。观察者只是一组回调函数,分别对应可观察对象传递的每种通知类型:next
、error
和complete
。以下是一个典型的观察者对象的示例:
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// To use it, pass it to a subscribe
observable.subscribe(observer);
对于观察者来说,确实就是这样!
运算符
尽管 Observable 是 RxJS 的基础,但其最有用的还是它的操作符。之前我们学习的操作符是用来转换流的函数。这里没什么变化,只是术语不同!
RxJS 拥有非常丰富的操作符库。我们只会涉及几个简单的操作符来涵盖我们已经讨论过的内容:
import { from } from "rxjs";
import { filter } from "rxjs/operators";
from([1, 2, 3, 4, 5]).pipe(
filter((x) => (x % 2) === 0)
).subscribe(console.log); // [2, 4]
如果您还记得我们之前的过滤器示例,那么这应该相当容易理解!
管道
管道只是一系列按顺序执行的操作符。显而易见,但人们却常常忘记,每个管道操作符都必须返回一个可观察对象。
与前面相同的示例,但使用链接运算符:
import { from } from "rxjs";
import { filter, take, map } from "rxjs/operators";
from([1, 2, 3, 4, 5]).pipe(
filter((x) => (x % 2) === 0),
take(1),
map((firstValue) => "The first even number was " + firstValue)
).subscribe(console.log);
还有大量操作符,它们可以执行各种类型的操作,例如:创建、过滤、组合、错误处理、转换、多播等等。我鼓励你尝试每个类别中的一些操作符。这就是 RxJS 的强大之处,它已经为你做好了很多准备!
主题
Subject 类似于 Observable,但可以向多个 Observers 进行多播。Subject 类似于 EventEmitters:它们维护着多个监听器的注册表。实际上,Subject 的一部分实际上是一个 Observable,你可以获取该 Observable 的引用。
思考一个主题最简单的方式就是从字面上理解:
- 主体 = 观察者 + 可观察对象
例子:
import { Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
const observable = from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
在我看来,Subject 的最佳用例是当引用它的代码本身就是生成可观察数据的代码时。您可以轻松让消费者订阅 Subject,然后调用.next()
函数将数据推送到管道中。但要谨慎使用它们,因为大多数问题只需数据转换和可观察对象即可解决。
调度器
最后,调度器!它们可能看起来很难理解,但从表面上看相当简单,我们了解一下就足够了。本质上,调度器控制着可观察对象的任务顺序。调度器只有几个,而且短期内不会改变,具体如下:
你可以通过一些操作符(通常是创建类)将调度器作为参数传递给可观察对象来使用调度器。最基本的例子是强制同步可观察对象异步运行:
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x)
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
});
console.log('just after subscribe');
// Logs
// just before subscribe
// just after subscribe
// got value 1
// got value 2
// got value 3
// done
value...
注意,订阅后通知是如何被立即发送的。这是因为observeOn(asyncScheduler)
在新的 Observable 和最终的 Observer 之间引入了一个代理观察者。
其他调度程序可用于不同的时间。我们完成了!
精彩的 RxJS 资源
- RxJS 可视化工具: https://rxviz.com/
- 即时大理石图: https://thinkrx.io/
- 带有大理石图的文档: https://rxmarbles.com/
- 操作员决策树: https://rxjs.dev/operator-decision-tree
参考
- https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
- https://www.reactivemanifesto.org/
- https://en.wikipedia.org/wiki/Reactive_programming
- https://www.zachgollwitzer.com/posts/2020/rxjs-marble-diagram/
- https://medium.com/@bencabanes/marble-testing-observable-introduction-1f5ad39231c
- https://www.lightbend.com/white-papers-and-reports/reactive-programming-versus-reactive-systems