使用 RxJS 在 JavaScript 中进行反应式编程。

2025-05-26

使用 RxJS 在 JavaScript 中进行反应式编程。

RxJS 是一个用于转换、组合和查询异步数据流的 JavaScript 库。RxJS 既可以在浏览器中使用,也可以在 Node.js 的服务器端使用。

我接受了一项挑战,要用一种通俗易懂的方式向开发者解释 RxJS。学习 RxJS 最难的部分是“以响应式的方式思考”

将 RxJS 视为处理异步事件的“LoDash”。

那么,反应式编程到底是什么?

响应式编程

响应式编程是一种编写代码的编程范式,主要关注异步数据流。它只是一种构建软件应用程序的另类方式,能够对发生的变化做出“反应”,而不是像传统的软件编写方式那样,显式地编写代码(又称“命令式”编程)来处理这些变化。

溪流

溪流

流是按时间顺序排列的一系列正在进行的事件。它可以是任何内容,例如用户输入、按钮点击或数据结构。您可以监听流并做出相应的响应。您可以使用函数来组合、过滤或映射流。

Stream 在其时间轴上会发出三件事:一个值、一个错误和一个完成信号。我们必须捕获这个异步事件并相应地执行函数。

承诺和可观察对象都是为了解决异步问题而构建的(以避免“回调地狱”)。

现代 Web 应用程序中的异步操作类型

  • DOM 事件-(鼠标事件、触摸事件、键盘事件、表单事件等)
  • 动画 - (CSS 过渡和动画、requestAnimationFrame 等)
  • AJAX
  • WebSockets
  • SSE - 服务器发送事件
  • 替代输入(语音、操纵杆等)

如果你还是感到困惑,别担心,通常情况下,目前为止还没什么意义。让我们一步一步来。

可观察的

可观察的

  • Observable 只是一个函数,但有一些特殊之处。它接收一个“观察者”(一个拥有“next”、“error”和“complete”方法的对象)作为参数,并返回取消逻辑。
  • 可观察对象为应用程序中的发布者和订阅者之间传递消息提供支持。
  • 与其他事件处理、异步编程和处理多个值的技术相比,可观察对象具有显著的优势。
  • Observable 是惰性的。它直到你订阅它才会开始生成数据。
  • subscribe()返回一个订阅,消费者可以调用该订阅unsubscribe()来取消订阅并关闭生产者。
  • RxJS 提供了许多可用于创建新可观察对象的函数。这些函数可以简化从事件、计时器、Promise 等对象创建可观察对象的过程。例如:
    const button = document.querySelector("button");
    const observer = {
      next: function(value) {
        console.log(value);
      },
      error: function(err) {
        console.error(err);
      },
      complete: function() {
        console.log("Completed");
      }
    };

    // Create an Observable from event
    const observable = Rx.Observable.fromEvent(button, "click");
    // Subscribe to begin listening for async result
    observable.subscribe(observer);
Enter fullscreen mode Exit fullscreen mode

订阅

订阅

  • 只有当有人订阅 Observable 实例时,它才会开始发布值。您可以通过调用subscribe()该实例的方法并传递一个observer对象来接收通知,从而进行订阅。
  • Subscription 有一个重要的方法,unsubscribe()它不需要任何参数,只处理订阅所持有的资源。
    const button = document.querySelector("button");
    const observable = Rx.Observable.fromEvent(button, "click");
    const subscription = observable.subscribe(event => console.log(event));
    // Later:
    // This cancels the ongoing Observable execution which
    // was started by calling subscribe with an Observer.
    subscription.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

观察者

观察者

  • 一个observer对象字面量,包含next()error()complete()函数。在上面的例子中,观察者就是我们传递给.subscribe()方法的对象字面量。
  • 当 Observable 产生值时,它会通过调用方法通知观察者.next()何时成功捕获新值以及.error()何时发生错误。
  • 当我们订阅一个 Observable 时,它​​会不断向观察者传递值,直到信号完成。
  • 观察员的例子。
    // observer is just object literal with next(), error() and complete() functions
    // Howerver, next() function is required, remaining error() and complete() functions are optional 
    const observer = {
      next: function(value) {
        console.log(value);
      },
      error: function(err) {
        console.error(err);
      },
      complete: function() {
        console.log("Completed");
      }
    };
Enter fullscreen mode Exit fullscreen mode

运算符

操作员

  • 操作符是建立在可观察对象基础上的函数,用于实现对集合的复杂操作。
  • Operator 本质上是一个纯函数,它以一个 Observable 作为输入,并生成另一个 Observable 作为输出。
  • 有用于不同目的的操作符,它们可以分为创建、转换、过滤、组合、多播、错误处理、实用程序等。
  • 运算符会将每个值从一个运算符传递到下一个运算符,然后再处理集合中的下一个值。这与数组运算符(map 和 filter)不同,后者会在每一步处理整个数组。
  • 例如,
    const observable = Rx.Observable.of(1, 2, 3).map(value => value * value);

    observable.subscribe(x => console.log(x));
    // Output:
    // 1
    // 4
    // 9
Enter fullscreen mode Exit fullscreen mode
  • RxJS 提供了许多运算符,但只有少数几个是常用的。如需查看运算符列表和使用示例,请访问RxJS API 文档

常用运算符列表

主题

主题

  • RxJS Subject 是一种特殊类型的 Observable ,它允许将值多播给多个 Observer。普通的 Observable 是单播的(每个订阅的 Observer 都拥有一个独立的 Observable 执行),而Subject 是多播的
  • RxJS 中的主题是一种特殊的混合体,可以同时充当可观察对象和观察者。
  • 在下面的例子中,我们有两个观察者附加到一个主题,并且我们向主题提供一些值:
    const subject = new Rx.Subject();

    subject.subscribe({
      next: v => console.log("observerA: " + v)
    });
    subject.subscribe({
      next: v => console.log("observerB: " + v)
    });

    subject.next(1);
    subject.next(2);

    // output
    // observerA: 1
    // observerB: 1
    // observerA: 2
    // observerB: 2
Enter fullscreen mode Exit fullscreen mode

Observable 与 Promise

Observable 与 Promise

为了更好地理解,我们将比较 ES6 Promise API 与 Observable 库 RxJS。我们将了解 Promise 和 Observable 的相似之处和不同之处,以及为什么在某些情况下我们应该使用 Observable 而不是 Promise。

单一值与多个值

  • 如果您通过 Promise 发出请求并等待响应,则可以确保同一请求不会有多个响应。您可以创建一个 Promise,它会以某个值作为解析结果。
  • Promise 始终通过传递给 resolve 函数的第一个值来解决,并忽略对它的进一步调用。
  • 相反,Observables 允许您解析多个值,直到我们调用observer.complete()函数。
  • Promise 和 Observable 的示例。

    // creating demoPromise using ES6 Promise API
    const demoPromise = new Promise((resolve, reject) => {
      asyncOperation((err, value) => {
        if (err) {
          reject(err); // error occured. We will catch error inside chain .catch()
        } else {
          resolve(value); // value received. we will get value inside .then() chain method
        }
      });
    });
    
    // creating a demoObservable using Rxjs.Observable API
    const demoObservable = Rx.Observable.create(observer => {
      asyncOperation((err, value) => {
        if (err) {
          observer.error(err); // instead of reject(err)
        } else {
          observer.next(value); // instead of resolve(value)
          observer.complete(); // optional. once your async task finished then call observer.complete()
        }
      });
    });
    

渴望与懒惰

  • 承诺在设计上是热切的,这意味着承诺将在承诺构造函数被调用后立即开始执行您赋予它的任何任务。
  • Observable 是惰性的。只有当有人真正订阅 Observable 时,Observable 的构造函数才会被调用,这意味着在你订阅它之前什么都不会发生。
  • 例如,
    // demoPromise started emmiting values but still we have not call .then() method on promise
    const demoPromise = new Promise((resolve, reject) => {
      setTimeout(() => {
        console.log('emmit value');
        resolve(100);
      }, 3000);
    });

    // demoObservable not started emmiting values unitll we subscribe to it.
    const demoObservable = new Observable(observer => {
      setInterval(() => {
        if (err) {
          observer.error('DemoError throw'); // instead of reject(err)
        } else {
          observer.next('value'); // instead of resolve(value)
          observer.complete(); // optional. once your async task finished then call observer.complete()
        }
      });
    });
Enter fullscreen mode Exit fullscreen mode

不可取消 vs 可取消

  • 新 Promise 用户最常问的问题之一就是如何取消 Promise。ES6 Promise 尚不支持取消。事实上,取消在客户端编程中确实是一个重要的场景。
  • 使用第三方库,例如bluebirdaxios它们提供承诺取消功能。
  • unsubscribe()Observable 支持通过调用Observable 上的方法来取消异步任务。
  • 当你订阅一个 Observable 时,你会得到一个 Subscription,它代表正在进行的执行。只需调用unsubscribe()即可取消执行。
  • 可取消可观察对象的示例
    const observable = Rx.Observable.from([10, 20, 30]);
    const subscription = observable.subscribe(x => console.log(x));
    // Later:
    subscription.unsubscribe(); // its will stop ongoing execution 
Enter fullscreen mode Exit fullscreen mode

实际例子

根据值创建可观察对象

  const observable = Rx.Observable.of("foo", 98, false, ["john", "doe"], {
    age: 19,
    gender: "male"
  });

  observable.subscribe(val => console.log(val));
Enter fullscreen mode Exit fullscreen mode

从值流创建可观察对象

  const observable = Rx.Observable.create( observer => {
    observer.next('Hello');
    observer.next('Its monday morning!!');
  });

  observable.subscribe(value => console.log(value));
  // output:
  // Hello
  // It's monday morning
Enter fullscreen mode Exit fullscreen mode

DOM 事件中的可观察对象

    const button = document.querySelector('button');
    const observable = Rx.Observable.fromEvent(button, 'click');
    observable.subscribe(event => console.log(event));
Enter fullscreen mode Exit fullscreen mode

来自 Promise 的可观察对象

  const promise = new Promise((resolve, reject) => {
    asyncOperation((err, value) => {
      if (err) {
        reject(err);
      } else {
        resolve(value);
      }
    });
  });

  const Observable = Rx.Observable.fromPromise(promise);

  Observable.subscribe(value => console.log(value));
Enter fullscreen mode Exit fullscreen mode

通过 Timer 方法观察

  const timer = Rx.Observable.timer(3000);

  timer.subscribe(() => console.log("timeout!!"));
Enter fullscreen mode Exit fullscreen mode

从区间可观察

  const interval = Rx.Observable.interval(3000);

  interval.subscribe(tick => console.log(`${tick} tick`));
Enter fullscreen mode Exit fullscreen mode

Map 运算符

  const observable = Rx.Observable.from(2, 4, 6, 8);

  observable.map(value => value * value).subscribe(result => console.log(result));
Enter fullscreen mode Exit fullscreen mode

Do 运算符

    const dogs = Rx.Observable.of("Buddy", "Charlie", "Cooper", "Rocky");

    // do operator used for debugging purpose
    dogs
      .do(dog => console.log(dog))
      .filter(dog => dog === "Cooper")
      .do(dog => console.log(dog))
      .subscribe(dog => console.log(dog));
Enter fullscreen mode Exit fullscreen mode

防抖和节流

  • 去抖动 - 等待 X 时间,然后给我最后一个值。
  • 节流阀 - 给我第一个值,然后等待 X 时间。
    const input = document.querySelector("input");
    const observable = Rx.Observable.fromEvent(input, "keyup");

    observable.debounceTime(3000).subscribe(event => console.log(event));

    observable.throttleTime(1000).subscribe(event => console.log(event));
Enter fullscreen mode Exit fullscreen mode

bufferTime - 将过去的值收集为数组,并定期发出这些数组。

      const clicks = Rx.Observable.fromEvent(document, "click");
      const buffered = clicks.bufferTime(1000);
      buffered.subscribe(x => console.log(x));
Enter fullscreen mode Exit fullscreen mode

结论

Promise 最适合 AJAX 操作,因为 Observable 在处理异步任务方面非常强大。Observable 提供了一系列操作符,用于创建、转换、过滤和多播异步事件。听起来很棒,不是吗?:D

结束语

感谢阅读。希望你喜欢这篇文章,欢迎点赞、评论或分享给你的朋友。想更深入地了解 RxJS,请查看提供的参考链接。

参考

  1. RxJS 官方网站
  2. 你一直错过的响应式编程简介
  3. 学习RxJS
  4. 什么是 RxJS?
  5. RxJS 快速入门(含 20 个实例)
  6. Angular 官方网站
  7. RxJS:可观察对象、观察者和操作符简介
  8. Promises 与 Observables
文章来源:https://dev.to/sagar/reactive-programming-in-javascript-with-rxjs-4jom
PREV
开发人员工具
NEXT
如何使用 Next.js 构建博客