使用 Vanilla JavaScript 介绍 RxJS 概念
最近, egghead.io和Andre Staltz一起主持的一场网络研讨会给了我启发,我想分享一下我的学习心得。在那场网络研讨会之前,我对 RxJS 并不熟悉,而且那是我第一次接触观察者模式。在深入分析之前,观察者模式对我来说就像魔法一样。
JavaScript 有多个使用回调函数的 API,它们都执行几乎相同的操作,但略有不同。
流
stream.on('data', data => {
console.log(data)
})
stream.on('end', () => {
console.log("Finished")
})
stream.on('error', err => {
console.error(err)
})
承诺
somePromise()
.then(data => console.log(data))
.catch(err => console.error(err))
事件监听器
document.addEventListener('click', event => {
console.log(event.clientX)
})
你看到的大致模式是,有一个对象,对象内部有一个接受函数(换句话说,回调)的方法。它们都在解决同一个问题,只是方式不同,这导致你需要费力记住每个 API 的具体语法。这就是 RxJS 的用武之地。RxJS 将所有这些统一在一个通用的抽象之下。
那么,可观察对象究竟是什么?它是一种抽象,就像数组、函数或对象都是抽象一样。Promise 可以解析(resolve)或拒绝(reject),并返回一个值。可观察对象能够随时间发出值。你可以消费来自服务器的数据流,或者监听 DOM 事件。
💀 可观察的骨架
const observable = {
subscribe: observer => {
},
pipe: operator => {
},
}
可观察对象只是包含subscribe
和pipe
方法的对象。等等,这到底是怎么回事?什么是观察者,什么是操作符?观察者只是包含next
、error
和 的回调方法的对象complete
。该subscribe
方法会消费观察者并将值传递给它。因此,可观察对象充当生产者,而观察者是它的消费者。
👀 观察者
const observer = {
next: x => {
console.log(x)
},
error: err => {
console.log(err)
},
complete: () => {
console.log("done")
}
}
在该方法中,subscribe
您将某种形式的数据传递给观察者的方法。
订阅方法
const observable = {
subscribe: observer => {
document.addEventListener("click", event => {
observer.next(event.clientX)
})
},
pipe: operator => {
},
}
这里我们只是监听文档中任意位置的点击。如果我们运行这段代码并调用observable.subscribe(observer)
,我们就会在控制台中看到点击的 x 坐标。那么这个pipe
方法呢?pipe
它使用一个运算符并返回一个函数,然后使用可观察对象调用返回的函数。
管道方法
const observable = {
subscribe: observer => {
document.addEventListener("click", event => {
observer.next(event.clientX)
})
},
pipe: operator => {
return operator(this)
},
}
很酷,但什么是运算符呢?运算符用于转换数据。数组有运算符,例如map
. map
,它让你退一步,对数组中的所有内容运行某个函数。你可以有一个数组,然后还有一个数组,它是第一个数组的映射版本。
让我们map
为可观察对象编写一个函数。
🗺️ 地图操作符
const map = f => {
return observable => {
subscribe: observer => {
observable.subscribe({
next: x => {
observer.next(f(x))
},
error: err => {
console.error(err)
},
complete: () => {
console.log("finished")
}
})
},
pipe: operator => {
return operator(this)
},
}
}
这里发生了很多事情,让我们来分析一下。
const map = f => {
return observable => {
这里我们传入一个函数,并返回一个需要可观察变量的函数。还记得我们的pipe
方法吗?
pipe: operator => {
return operator(this)
},
要在可观察对象上运行运算符,需要将其传递到pipe
。pipe
将把它调用的可观察对象传递到我们的运算符返回的函数中。
subscribe: observer => {
observable.subscribe({
接下来,我们定义subscribe
要返回的可观察对象的方法。该方法需要一个观察者,将来在返回的可观察对象上调用该方法时(可以通过其他操作符或显式调用)会接收该方法。然后,使用观察者对象.subscribe
进行调用。observable.subscribe
{
next: x => {
observer.next(f(x))
},
error: err => {
console.error(err)
},
complete: () => {
console.log("finished")
}
}
在观察者的方法中,你可以看到,我们通过最初传入的函数和一个传入的值,调用了next
未来的观察者。让我们在可观察对象上运行我们的 new 运算符!next
map
x
next
map
observable
.pipe(map(e => e.clientX))
.pipe(map(x => x - 1000))
.subscribe(observer)
必须使用final 修饰符subscribe
,否则这些操作符中的任何操作都不会执行,因为它们都被包装在了观察者的subscribe
方法中。这些subscribe
方法会调用subscribe
链中的前一个观察者,但观察者链必须从某个地方开始。
那么让我们来观察一下运行时会发生什么。
- 第一个管道在可观察对象上被调用,
map
并被柯里化this
map
被调用e => e.clientX
并返回一个函数- 该函数使用原始函数调用
observable
,并返回一个可观察值- 我们将其称为 observable2
pipe
被调用observable2
并map
咖喱this
map
被调用x => x - 1000
并返回一个函数- 该函数被调用
observable2
并返回一个可观察对象- 我们将其称为 observable3
.subscribe
observable3
被传入观察者调用.subscribe
observable2
被操作员的观察者传入调用.subscribe
在原始可观察对象上调用,并传入操作符的观察者- 点击事件发生
clientX
时100
observer2.next(100)
被叫observer3.next(100)
被叫observer.next(-900)
被调用并记录-900
到控制台。- 完毕!
您可以看到这里发生的链条。当您调用时subscribe
,您正在请求信息,每个环节都会向链条中的前一个环节请求信息,直到到达数据并next
调用其观察者的方法。然后,该数据沿着链条向上回溯,沿途进行转换,直到到达最终的观察者。
以下是完整代码。
const observable = {
subscribe: observer => {
document.addEventListener("click", event => {
observer.next(event.clientX)
})
},
pipe: operator => {
return operator(this)
}
}
const observer = {
next: x => {
console.log(x)
},
error: err => {
console.log(err)
},
complete: () => {
console.log("done")
}
}
const map = f => {
return observable => {
subscribe: observer => {
observable.subscribe({
next: x => {
observer.next(f(x))
},
error: err => {
console.error(err)
},
complete: () => {
console.log("finished")
}
})
},
pipe: operator => {
return operator(this)
},
}
}