从头开始学习 RxJS:可观察对象
介绍
可观察对象
可观察对象和订阅者
拆卸逻辑
订阅
结论
从小起,我就一直对事物的底层工作原理充满好奇。同时,我也一直热衷于从底层开始学习和构建,以深入了解新事物。
就编程而言,我坚信学习库/框架背后的基本思想是建立扎实知识体系以及“超越”知识体系的最佳途径。因此,我写了这篇文章!
在本文中,我们将从零开始逐步实现 RxJS。我们将遵循与真实 RxJS 代码库相同的架构,但去除所有优化和非必要功能。
我们将从构建库的核心抽象开始:Observable。在此过程中,我们还将创建Observers、Subscribers和Subscriptions,而在下一集中,我们将实现可管道操作符。
介绍
为什么 RxJS 如此流行?答案就在文档里:
RxJS 是一个使用可观察序列组成异步和基于事件的程序的库。
该库使这些任务更加简单、更具声明性且易于理解。为了实现这一目标,RxJS 提供了三个主要优势:
- 清晰的控制流:管道运算符可帮助您轻松控制事件如何流经您的 Observable
- 函数纯度:使用纯函数“安全地”生成和处理值的能力
- 值转换:你可以根据需要转换通过 Observable 传递的值
让我们通过一个玩具示例来阐明为什么 RxJS 如此强大(相信我,您可以使用 RxJS 做更多的事情)。
// without RxJS: IMPERATIVE
let count = 0
const rate = 1000
let lastClick = Date.now() - rate;
document.addEventListener('click', event => {
if (Date.now() - lastClick >= rate) {
count += event.clientX;
console.log(count);
lastClick = Date.now();
}
})
// with RxJS: DECLARATIVE
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
map(event => event.clientX),
scan((count, clientX) => count + clientX, 0)
)
.subscribe(console.log)
一切似乎都围绕着 Observable 构建,事实也确实如此。那么,什么是Observable呢?我们来解释一下这个概念。
可观察对象
可观察对象是多个值的惰性推送集合。
让我们将这个定义分解成几个部分。
术语“拉”和“推”描述了生产者如何与消费者进行通信。函数是“拉”的实体,实际上是调用者(开发者)明确地向函数请求数据。相反,在“推”系统中,生产者本身负责将数据发送给消费者,而消费者并不知道事件何时会被发出。
Observable 是一个类似于 Promises 的推送系统。实际上,它们可以被“监听”,并通过包装特定的数据源(例如 DOM 事件、间隔、Promise、同步数据等等)来负责发出值。
Observables 类似于 Promises,但有一些区别:
- 它们可以多次“解析”并发出多个值
- 他们有一种
subscribe
方法,而不是一种then
方法 - 他们可能会在完成之前取消订阅
总结一下,“惰性集合”是什么意思?Observable 是惰性集合,因为它们直到被 subscribed 才会真正发出数据。很简单!
这是关于同步和拉取实体(函数、生成器)与异步和推送实体(承诺和可观察对象)之间的差异的解释性图像。
函数需要同步传入一个值,而生成器需要传入多个值。Promise 异步发出(解析为)一个值,可以使用其方法(then、catch、finally)进行监听。Observable 也可以被订阅(监听),但它们可以随时间推移发出多个值。
理论讲得够多了,让我们开始实践吧。现在是时候从 Observable 开始深入研究 RxJS 的实现了。
可观察对象和订阅者
Observable 抽象被实现为一个类。
class Observable {
constructor(initFunc) {
this.initFunc = initFunc;
}
subscribe(observer) {
return this.initFunc(observer);
}
}
Observable 类的构造函数需要一个函数作为参数。该函数负责生成并向消费者(称为观察者)发送值,但它不会立即被调用,而是存储在类实例中。当我们订阅 Observable 时,会initFunc
以观察者作为参数调用 Observable 函数。
目前,我们只需要知道Observer只是一个包含三个方法的对象就足够了,每个方法最终都会在 Observable 发出数据时被调用。当一切正常、发生错误、Observable 完成或耗尽时,都会调用该方法next
err
complete
。
附注:我们setInterval
在 init 函数内部使用,但我们构建的逻辑将适用于每种类型的 Observable。
让我们尝试一下原始的 Observable。
const myIntervalObx = new Observable(observer => { // <- provide the initFunc argument
let counter = 0
setInterval(() => observer.next(++counter), 700)
})
myIntervalObx.subscribe({
next: (val) => console.log(val),
error: (err) => console.log(err),
complete: () => console.log('Completed!')
})
// 1 <- emission starts after subscribing
// 2
// 3
// ...
仅当订阅时,Observable 才会以 Observer 作为参数调用存储的函数,开始值发射(在本例中是一个简单的间隔)。
太棒了!但现在我们遇到了一个问题:如果我们在 init 函数内部调用next
它,complete
值仍然会被发送给观察者。我们希望在complete
调用它之后停止事件的发送。
为了明确问题,请看下面的代码片段。
const myIntervalObx = new Observable(observer => {
let counter = 0
setInterval(() => observer.next(++counter), 700)
setTimeout(() => observer.complete(), 2500)
})
myIntervalObx.subscribe({
next: (val) => console.log(val),
error: (err) => console.log(err),
complete: () => console.log('Completed!')
})
// 1
// 2
// 3
// Completed! <- observable calls 'complete'
// 4 <- values are still emitted
// 5
// ...
如上所述,2500 毫秒后 Observable 完成,但next
间隔回调内的调用仍然处于活动状态并正在运行。
为了避免这个问题,我们可以创建一个安全的观察者,名为Subscriber,方法是将 Observer 本身包装在一个类似代理的对象中。Subscriber 会检查是否已调用 complete 方法,并最终停止向被包装的 Observer 中传播事件。
// a safe wrapper around observers
class Subscriber {
constructor(observer) {
this.observer = observer;
this.closed = false;
}
next(value) {
if (!this.closed) {
this.observer.next(value);
}
}
error(err) {
if (!this.closed) {
this.closed = true;
this.observer.error(err);
}
}
complete() {
if (!this.closed) {
this.closed = true;
this.observer.complete();
}
}
}
现在我们需要改变Observable类的subscribe方法。
class Observable {
//...
subscribe(observer) {
const subscriber = new Subscriber(observer)
return this.initFunc(subscriber)
}
}
// after completion, the events will not propagate further
myIntervalObx.subscribe({ /* same as before */ })
// 1
// 2
// 3
// Completed! <- stops here
我们解决了问题!嗯,还没有完全解决。虽然事件传播停止了,但间隔仍在运行并消耗资源。让我们看看如何解决这个问题。
拆卸逻辑
让我们通过在间隔回调中添加日志来明确问题。
const myIntervalObx = new Observable(observer => {
let counter = 0
setInterval(() => {
counter++
console.log(`Still active. Current value: ${counter}`)
observer.next(counter)
}, 700)
setTimeout(() => observer.complete(), 2500)
})
myIntervalObx.subscribe({ /* ... */ })
// Still active. Current value: 1 <- from the interval callback
// 1 <- from the Observer
// Still active. Current value: 2
// 2
// Still active. Current value: 3
// 3
// Completed!
// Still active. Current value: 4 <- interval still running after ‘complete’
// Still active. Current value: 5
// ...
我们需要一种方法来清理 Observables 使用的资源。我们将从 中返回一个函数initFunc
,该函数将用于执行拆卸逻辑。我们将此函数称为“subscription”,它将由 subscribe 方法返回。
const myIntervalObx = new Observable(observer => {
let counter = 0
let id = setInterval(() => {
counter++
console.log(`Still active. Current value: ${counter}`)
observer.next(counter)
}, 700)
setTimeout(() => observer.complete(), 2500)
// return the teardown logic
return () => {
console.log('Teardown logic')
clearInterval(id)
}
})
const subscription = myIntervalObx.subscribe({ /* same as before */ })
// logs...
subscription() // <- stops the interval
我们快完成了!我相信你注意到了另一个问题:Observable 的完成和拆卸逻辑有点独立。我们喜欢这种行为吗?一点也不!
继续为已完成的 Observable 浪费资源是没有意义的,反之,在未关闭 Observable 的情况下调用拆卸逻辑也是没有意义的。因此,完成 Observable 实现的最后一步是将完成操作与拆卸逻辑同步。我们需要创建 Subscription 类来处理此任务,并改进拆卸函数的管理。
订阅
Subscription类是函数的容器,包含旧的订阅函数。每个函数都将通过调用该unsubscribe
方法来调用。以下是具体实现。
// a container for functions
class Subscription {
constructor() {
this.teardowns = [];
}
add(teardown) {
this.teardowns.push(teardown);
}
unsubscribe() {
this.teardowns.forEach(teardown => teardown())
this.teardowns = [];
}
}
为了耦合完成逻辑(observer.complete
)和拆卸逻辑(从 init 函数返回),我们必须让订阅和订阅者都能够调用这两个函数。
现在,请耐心等待!😁为了同步这两个逻辑,我们必须:
- 向订阅者构造函数提供订阅,这将向订阅容器添加观察者完成逻辑
- 允许订阅者
unsubscribe
在完成时调用 - 将拆卸逻辑(从 initFunc 返回)添加到 Subscription 容器
- 从 Observable 的 subscribe 方法返回 Subscription (与之前相同)
请记住,这里的 Subscription 对象是通过引用传递的。
现在,无论是从外部取消订阅,还是 Observable 本身的完成,都会同时执行完成逻辑和拆卸逻辑。为了更清晰地理解,请看下面重构后的 Observable 和 Subscriber 类。
class Observable {
constructor(initFunc) {
this.initFunc = initFunc;
}
subscribe(observer) {
const subscription = new Subscription()
const subscriber = new Subscriber(observer, subscription) // <- passed by reference
const teardown = this.initFunc(subscriber)
// 3. add the teardown logic to the Subscription instance
subscription.add(teardown) // <- second function inside the subscription
return subscription
}
}
// a safe wrapper around observers
class Subscriber {
constructor(observer, subscription) {
this.observer = observer;
this.closed = false;
this.subscription = subscription
// 1. add an Observer completion logic to the Subscription container
this.subscription.add(() => this.closed = true) // <- first function inside the subscription
}
next(value) {
if (!this.closed) {
this.observer.next(value);
}
}
error(err) {
if (!this.closed) {
this.closed = true;
this.observer.error(err);
// 2. enable the Subscriber to call `unsubscribe` on completion
this.subscription.unsubscribe() // <- unsubscribe on error
}
}
complete() {
if (!this.closed) {
this.closed = true;
this.observer.complete();
this.subscription.unsubscribe() // <- unsubscribe on completion
}
}
}
总而言之,订阅者可以unsubscribe
在完成/错误导致两个存储函数同时运行的情况下调用 unsubscribe 方法,如果调用者(开发者)从外部调用 unsubscribe 方法,情况也是如此。更准确地说,在后一种情况下,订阅者通过将标志设置this.closed
为 true 来关闭,但它实际上并没有调用complete
Observer 的方法。原始 RxJS 库也是如此。
我们已经将拆卸逻辑与 Observable 的完成同步了。现在我们真的完成了!😁
RxJS 的所有其他部分都将是此逻辑的扩展,正如您将在下一篇文章中看到的可管道运算符!
结论
我们通过创建 Observable 类实现了 RxJS 版本的第一部分。负责生成和发送值的逻辑通过 init 函数提供给 Observable。在实际场景中,RxJS 提供了创建操作符,方便我们在各种同步和异步情况下轻松生成 Observable 。
下面的示例都返回一个 Observable。
// CREATION OPERATORs
const fromEvent = (eventTarget, eventType) => {
return new Observable(observer => {
const eventHandler = e => observer.next(e)
eventTarget.addEventListener(eventType, eventHandler)
return () => {
eventTarget.removeEventListener(eventType, eventHandler)
}
})
}
const interval = (period) => {
return new Observable(observer => {
let counter = 0
const id = setInterval(() => observer.next(++counter), period)
return () => {
clearInterval(id)
}
})
}
RxJS 还有很多功能。管道操作符是我们库中即将集成的下一个重要功能。这些操作符允许我们连接多个 Observable,从而轻松处理复杂的数据流。
响应式编程是一个复杂的主题,但我认为扎实理解其基本原理才是学习 RxJS 的正确方法。希望本文对你有所帮助!
第二部分见!😁
附言:英语不是我的母语,所以错误难免。欢迎大家评论指正!
文章来源:https://dev.to/mr_bertoli/rxjs-from-scratch-observables-hl6