RxJS 从头开始​​:可观察对象简介、可观察对象和订阅者、拆解逻辑、订阅、结论

2025-05-25

从头开始学习 RxJS:可观察对象

介绍

可观察对象

可观察对象和订阅者

拆卸逻辑

订阅

结论

从小起,我就一直对事物的底层工作原理充满好奇。同时,我也一直热衷于从底层开始学习和构建,以深入了解新事物。

就编程而言,我坚信学习库/框架背后的基本思想是建立扎实知识体系以及“超越”知识体系的最佳途径。因此,我写了这篇文章!

在本文中,我们将从零开始逐步实现 RxJS。我们将遵循与真实 RxJS 代码库相同的架构,但去除所有优化和非必要功能。

我们将从构建库的核心抽象开始:Observable。在此过程中,我们还将创建ObserversSubscribersSubscriptions,而在下一集中,我们将实现可管道操作符。

介绍

为什么 RxJS 如此流行?答案就在文档里:

RxJS 是一个使用可观察序列组成异步和基于事件的程序的库。

该库使这些任务更加简单、更具声明性且易于理解。为了实现这一目标,RxJS 提供了三个主要优势:

  • 清晰的控制流:管道运算符可帮助您轻松控制事件如何流经您的 Observable
  • 函数纯度:使用纯函数“安全地”生成和处理值的能力
  • 值转换:你可以根据需要转换通过 Observable 传递的值

让我们通过一个玩具示例来阐明为什么 RxJS 如此强大(相信我,您可以使用 RxJS 做更多的事情)。

// without RxJS: IMPERATIVE
let count = 0
const rate = 1000
let lastClick = Date.now() - rate;
document.addEventListener('click', event => {
  if (Date.now() - lastClick >= rate) {
    count += event.clientX;
    console.log(count);
    lastClick = Date.now();
  }
})


// with RxJS: DECLARATIVE
fromEvent(document, 'click')
  .pipe(
    throttleTime(1000),
    map(event => event.clientX),
    scan((count, clientX) => count + clientX, 0)
  )
  .subscribe(console.log)
Enter fullscreen mode Exit fullscreen mode

一切似乎都围绕着 Observable 构建,事实也确实如此。那么,什么是Observable呢?我们来解释一下这个概念。

可观察对象

可观察对象是多个值的惰性推送集合。

让我们将这个定义分解成几个部分。

术语“拉”“推”描述了生产者如何与消费者进行通信。函数是“拉”的实体,实际上是调用者(开发者)明确地向函数请求数据。相反,在“推”系统中,生产者本身负责将数据发送给消费者,而消费者并不知道事件何时会被发出。

Observable 是一个类似于 Promises 的推送系统。实际上,它们可以被“监听”,并通过包装特定的数据源(例如 DOM 事件、间隔、Promise、同步数据等等)来负责发出值。

Observables 类似于 Promises,但有一些区别:

  • 它们可以多次“解析”并发出多个值
  • 他们有一种subscribe方法,而不是一种then方法
  • 他们可能会在完成之前取消订阅

总结一下,“惰性集合”是什么意思?Observable 是惰性集合,因为它们直到被 subscribed 才会真正发出数据。很简单!

这是关于同步和拉取实体(函数、生成器)与异步和推送实体(承诺和可观察对象)之间的差异的解释性图像。

替代文本

函数需要同步传入一个值,而生成器需要传入多个值。Promise 异步发出(解析为)一个值,可以使用其方法(then、catch、finally)进行监听。Observable 也可以被订阅(监听),但它们可以随时间推移发出多个值。

理论讲得够多了,让我们开始实践吧。现在是时候从 Observable 开始深入研究 RxJS 的实现了。

可观察对象和订阅者

Observable 抽象被实现为一个类。

class Observable {
  constructor(initFunc) {
    this.initFunc = initFunc;
  }
  subscribe(observer) {
    return this.initFunc(observer);
  }
}
Enter fullscreen mode Exit fullscreen mode

Observable 类的构造函数需要一个函数作为参数。该函数负责生成并向消费者(称为观察者)发送值,但它不会立即被调用,而是存储在类实例中。当我们订阅 Observable 时,会initFunc以观察者作为参数调用 Observable 函数。

目前,我们只需要知道Observer只是一个包含三个方法的对象就足够了,每个方法最终都会在 Observable 发出数据时被调用。当一切正常、发生错误、Observable 完成或耗尽时,都会调用该方法nexterrcomplete

附注:我们setInterval在 init 函数内部使用,但我们构建的逻辑将适用于每种类型的 Observable。

让我们尝试一下原始的 Observable。

const myIntervalObx = new Observable(observer => {  // <- provide the initFunc argument
  let counter = 0
  setInterval(() => observer.next(++counter), 700)
})

myIntervalObx.subscribe({
  next: (val) => console.log(val),
  error: (err) => console.log(err),
  complete: () => console.log('Completed!')
})
// 1    <- emission starts after subscribing
// 2
// 3
// ...
Enter fullscreen mode Exit fullscreen mode

仅当订阅时,Observable 才会以 Observer 作为参数调用存储的函数,开始值发射(在本例中是一个简单的间隔)。

太棒了!但现在我们遇到了一个问题:如果我们在 init 函数内部调用next它,complete值仍然会被发送给观察者。我们希望在complete调用它之后停止事件的发送。

为了明确问题,请看下面的代码片段。

const myIntervalObx = new Observable(observer => {
  let counter = 0
  setInterval(() => observer.next(++counter), 700)
  setTimeout(() => observer.complete(), 2500)
})

myIntervalObx.subscribe({
  next: (val) => console.log(val),
  error: (err) => console.log(err),
  complete: () => console.log('Completed!')
})
// 1
// 2
// 3
// Completed!   <- observable calls 'complete'
// 4            <- values are still emitted
// 5
// ...
Enter fullscreen mode Exit fullscreen mode

如上所述,2500 毫秒后 Observable 完成,但next间隔回调内的调用仍然处于活动状态并正在运行。

为了避免这个问题,我们可以创建一个安全的观察者,名为Subscriber,方法是将 Observer 本身包装在一个类似代理的对象中。Subscriber 会检查是否已调用 complete 方法,并最终停止向被包装的 Observer 中传播事件。

// a safe wrapper around observers
class Subscriber {
  constructor(observer) {
    this.observer = observer;
    this.closed = false;
  }
  next(value) {
    if (!this.closed) {
      this.observer.next(value);
    }
  }
  error(err) {
    if (!this.closed) {
      this.closed = true;
      this.observer.error(err);
    }
  }
  complete() {
    if (!this.closed) {
      this.closed = true;
      this.observer.complete();
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

现在我们需要改变Observable类的subscribe方法。

class Observable {
  //...
  subscribe(observer) {
    const subscriber = new Subscriber(observer)
    return this.initFunc(subscriber)
  }
}

// after completion, the events will not propagate further
myIntervalObx.subscribe({ /* same as before */ })
// 1
// 2
// 3
// Completed!   <- stops here
Enter fullscreen mode Exit fullscreen mode

我们解决了问题!嗯,还没有完全解决。虽然事件传播停止了,但间隔仍在运行并消耗资源。让我们看看如何解决这个问题。

拆卸逻辑

让我们通过在间隔回调中添加日志来明确问题。

const myIntervalObx = new Observable(observer => {
  let counter = 0
  setInterval(() => {
    counter++
    console.log(`Still active. Current value: ${counter}`)
    observer.next(counter)
  }, 700)
  setTimeout(() => observer.complete(), 2500)
})

myIntervalObx.subscribe({ /* ... */ })
// Still active. Current value: 1   <- from the interval callback
// 1                                <- from the Observer
// Still active. Current value: 2
// 2
// Still active. Current value: 3
// 3
// Completed!
// Still active. Current value: 4   <- interval still running after ‘complete’
// Still active. Current value: 5
// ...
Enter fullscreen mode Exit fullscreen mode

我们需要一种方法来清理 Observables 使用的资源。我们将从 中返回一个函数initFunc,该函数将用于执行拆卸逻辑。我们将此函数称为“subscription”,它将由 subscribe 方法返回。

const myIntervalObx = new Observable(observer => {
  let counter = 0
  let id = setInterval(() => {
    counter++
    console.log(`Still active. Current value: ${counter}`)
    observer.next(counter)
  }, 700)
  setTimeout(() => observer.complete(), 2500)

  // return the teardown logic
  return () => {
    console.log('Teardown logic')
    clearInterval(id)
  }
})

const subscription = myIntervalObx.subscribe({ /* same as before */ })
// logs...
subscription()  // <- stops the interval
Enter fullscreen mode Exit fullscreen mode

我们快完成了!我相信你注意到了另一个问题:Observable 的完成和拆卸逻辑有点独立。我们喜欢这种行为吗?一点也不!

继续为已完成的 Observable 浪费资源是没有意义的,反之,在未关闭 Observable 的情况下调用拆卸逻辑也是没有意义的。因此,完成 Observable 实现的最后一步是将完成操作与拆卸逻辑同步。我们需要创建 Subscription 类来处理此任务,并改进拆卸函数的管理。

订阅

Subscription类是函数的容器,包含旧的订阅函数。每个函数都将通过调用该unsubscribe方法来调用。以下是具体实现。

// a container for functions
class Subscription {
  constructor() {
    this.teardowns = [];
  }
  add(teardown) {
    this.teardowns.push(teardown);
  }
  unsubscribe() {
    this.teardowns.forEach(teardown => teardown())
    this.teardowns = [];
  }
}
Enter fullscreen mode Exit fullscreen mode

为了耦合完成逻辑(observer.complete)和拆卸逻辑(从 init 函数返回),我们必须让订阅和订阅者都能够调用这两个函数。

现在,请耐心等待!😁为了同步这两个逻辑,我们必须:

  1. 向订阅者构造函数提供订阅,这将向订阅容器添加观察者完成逻辑
  2. 允许订阅者unsubscribe在完成时调用
  3. 将拆卸逻辑(从 initFunc 返回)添加到 Subscription 容器
  4. 从 Observable 的 subscribe 方法返回 Subscription (与之前相同)

请记住,这里的 Subscription 对象是通过引用传递的。

现在,无论是从外部取消订阅,还是 Observable 本身的完成,都会同时执行完成逻辑和拆卸逻辑。为了更清晰地理解,请看下面重构后的 Observable 和 Subscriber 类。

class Observable {
  constructor(initFunc) {
    this.initFunc = initFunc;
  }
  subscribe(observer) {
    const subscription = new Subscription()
    const subscriber = new Subscriber(observer, subscription)   // <- passed by reference

    const teardown = this.initFunc(subscriber)
    // 3. add the teardown logic to the Subscription instance
    subscription.add(teardown)  // <- second function inside the subscription

    return subscription
  }
}
Enter fullscreen mode Exit fullscreen mode
// a safe wrapper around observers
class Subscriber {
  constructor(observer, subscription) {
    this.observer = observer;
    this.closed = false;
    this.subscription = subscription
    // 1. add an Observer completion logic to the Subscription container
    this.subscription.add(() => this.closed = true) // <- first function inside the subscription
  }
  next(value) {
    if (!this.closed) {
      this.observer.next(value);
    }
  }
  error(err) {
   if (!this.closed) {
      this.closed = true;
      this.observer.error(err);
      // 2. enable the Subscriber to call `unsubscribe` on completion
      this.subscription.unsubscribe()  // <- unsubscribe on error
    }
  }
  complete() {
    if (!this.closed) {
      this.closed = true;
      this.observer.complete();
      this.subscription.unsubscribe()  // <- unsubscribe on completion
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

总而言之,订阅者可以unsubscribe在完成/错误导致两个存储函数同时运行的情况下调用 unsubscribe 方法,如果调用者(开发者)从外部调用 unsubscribe 方法,情况也是如此。更准确地说,在后一种情况下,订阅者通过将标志设置this.closed为 true 来关闭,但它实际上并没有调用completeObserver 的方法。原始 RxJS 库也是如此。

我们已经将拆卸逻辑与 Observable 的完成同步了。现在我们真的完成了!😁

RxJS 的所有其他部分都将是此逻辑的扩展,正如您将在下一篇文章中看到的可管道运算符!

结论

我们通过创建 Observable 类实现了 RxJS 版本的第一部分。负责生成和发送值的逻辑通过 init 函数提供给 Observable。在实际场景中,RxJS 提供了创建操作符,方便我们在各种同步和异步情况下轻松生成 Observable 。

下面的示例都返回一个 Observable。

// CREATION OPERATORs
const fromEvent = (eventTarget, eventType) => {
  return new Observable(observer => {
    const eventHandler = e => observer.next(e)
    eventTarget.addEventListener(eventType, eventHandler)
    return () => {
      eventTarget.removeEventListener(eventType, eventHandler)
    }
  })
}

const interval = (period) => {
  return new Observable(observer => {
    let counter = 0
    const id = setInterval(() => observer.next(++counter), period)
    return () => {
      clearInterval(id)
    }
  })
}
Enter fullscreen mode Exit fullscreen mode

RxJS 还有很多功能。管道操作符是我们库中即将集成的下一个重要功能。这些操作符允许我们连接多个 Observable,从而轻松处理复杂的数据流。

响应式编程是一个复杂的主题,但我认为扎实理解其基本原理才是学习 RxJS 的正确方法。希望本文对你有所帮助!

第二部分见!😁

附言:英语不是我的母语,所以错误难免。欢迎大家评论指正!

文章来源:https://dev.to/mr_bertoli/rxjs-from-scratch-observables-hl6
PREV
每个开源项目都应该有的文件
NEXT
函数式编程的充分介绍 简介 纯函数 函数作为值 函数组合 不变性和不变方法 状态管理和副作用 结论