使用 Vanilla JavaScript 介绍 RxJS 概念

2025-05-25

使用 Vanilla JavaScript 介绍 RxJS 概念

最近, egghead.io和Andre Staltz一起主持的一场网络研讨会给了我启发,我想分享一下我的学习心得。在那场网络研讨会之前,我对 RxJS 并不熟悉,而且那是我第一次接触观察者模式。在深入分析之前,观察者模式对我来说就像魔法一样。

JavaScript 有多个使用回调函数的 API,它们都执行几乎相同的操作,但略有不同。


stream.on('data', data => {
   console.log(data)
})
stream.on('end', () => {
   console.log("Finished")
})
stream.on('error', err => {
   console.error(err)
})
Enter fullscreen mode Exit fullscreen mode

承诺

somePromise()
  .then(data => console.log(data))
  .catch(err => console.error(err))
Enter fullscreen mode Exit fullscreen mode

事件监听器

document.addEventListener('click', event => {
  console.log(event.clientX)
})
Enter fullscreen mode Exit fullscreen mode

你看到的大致模式是,有一个对象,对象内部有一个接受函数(换句话说,回调)的方法。它们都在解决同一个问题,只是方式不同,这导致你需要费力记住每个 API 的具体语法。这就是 RxJS 的用武之地。RxJS 将所​​有这些统一在一个通用的抽象之下。

那么,可观察对象究竟是什么?它是一种抽象,就像数组、函数或对象都是抽象一样。Promise 可以解析(resolve)或拒绝(reject),并返回一个值。可观察对象能够随时间发出值。你可以消费来自服务器的数据流,或者监听 DOM 事件。

💀 可观察的骨架

const observable = {
  subscribe: observer => {

  },
  pipe: operator => {

  },
}
Enter fullscreen mode Exit fullscreen mode

可观察对象只是包含subscribepipe方法的对象。等等,这到底是怎么回事?什么是观察者,什么是操作符?观察者只是包含nexterror和 的回调方法的对象complete。该subscribe方法会消费观察者并将值传递给它。因此,可观察对象充当生产者,而观察者是它的消费者。

👀 观察者

const observer = {
  next: x => {
    console.log(x)
  },
  error: err => {
    console.log(err)
  },
  complete: () => {
    console.log("done")
  }
}
Enter fullscreen mode Exit fullscreen mode

在该方法中,subscribe您将某种形式的数据传递给观察者的方法。

订阅方法

const observable = {
  subscribe: observer => {
    document.addEventListener("click", event => {
      observer.next(event.clientX)
    })
  },
  pipe: operator => {

  },
}
Enter fullscreen mode Exit fullscreen mode

这里我们只是监听文档中任意位置的点击。如果我们运行这段代码并调用observable.subscribe(observer),我们就会在控制台中看到点击的 x 坐标。那么这个pipe方法呢?pipe它使用一个运算符并返回一个函数,然后使用可观察对象调用返回的函数。

管道方法

const observable = {
  subscribe: observer => {
    document.addEventListener("click", event => {
      observer.next(event.clientX)
    })
  },
  pipe: operator => {
    return operator(this)
  },
}
Enter fullscreen mode Exit fullscreen mode

很酷,但什么是运算符呢?运算符用于转换数据。数组有运算符,例如map. map,它让你退一步,对数组中的所有内容运行某个函数。你可以有一个数组,然后还有一个数组,它是第一个数组的映射版本。

让我们map为可观察对象编写一个函数。

🗺️ 地图操作符

const map = f => {
  return observable => {
    subscribe: observer => {
      observable.subscribe({
        next: x => {
          observer.next(f(x))
        },
        error: err => {
          console.error(err)
        },
        complete: () => {
          console.log("finished")
        }
      })
    },
    pipe: operator => {
      return operator(this)
    },
  }
}
Enter fullscreen mode Exit fullscreen mode

这里发生了很多事情,让我们来分析一下。

const map = f => {
  return observable => {
Enter fullscreen mode Exit fullscreen mode

这里我们传入一个函数,并返回一个需要可观察变量的函数。还记得我们的pipe方法吗?

pipe: operator => {
  return operator(this)
},
Enter fullscreen mode Exit fullscreen mode

要在可观察对象上运行运算符,需要将其传递到pipepipe将把它调用的可观察对象传递到我们的运算符返回的函数中。

subscribe: observer => {
  observable.subscribe({
Enter fullscreen mode Exit fullscreen mode

接下来,我们定义subscribe要返回的可观察对象的方法。该方法需要一个观察者,将来在返回的可观察对象上调用该方法时(可以通过其他操作符或显式调用)会接收该方法。然后,使用观察者对象.subscribe进行调用。observable.subscribe

{
  next: x => {
    observer.next(f(x))
  },
  error: err => {
    console.error(err)
  },
  complete: () => {
    console.log("finished")
  }
}
Enter fullscreen mode Exit fullscreen mode

在观察者的方法中,你可以看到,我们通过最初传入的函数和一个传入的值,调用了next未来的观察者。让我们在可观察对象上运行我们的 new 运算符!nextmapxnextmap

observable
  .pipe(map(e => e.clientX))
  .pipe(map(x => x - 1000))
  .subscribe(observer)
Enter fullscreen mode Exit fullscreen mode

必须使用final 修饰符subscribe,否则这些操作符中的任何操作都不会执行,因为它们都被包装在了观察者的subscribe方法中。这些subscribe方法会调用subscribe链中的前一个观察者,但观察者链必须从某个地方开始。

那么让我们来观察一下运行时会发生什么。

  1. 第一个管道在可观察对象上被调用,map并被柯里化this
  2. map被调用e => e.clientX并返回一个函数
  3. 该函数使用原始函数调用observable,并返回一个可观察值
    1. 我们将其称为 observable2
  4. pipe被调用observable2map咖喱this
  5. map被调用x => x - 1000并返回一个函数
  6. 该函数被调用observable2并返回一个可观察对象
    1. 我们将其称为 observable3
  7. .subscribeobservable3被传入观察者调用
  8. .subscribeobservable2被操作员的观察者传入调用
  9. .subscribe在原始可观察对象上调用,并传入操作符的观察者
  10. 点击事件发生clientX100
  11. observer2.next(100)被叫
  12. observer3.next(100)被叫
  13. observer.next(-900)被调用并记录-900到控制台。
  14. 完毕!

您可以看到这里发生的链条。当您调用时subscribe,您正在请求信息,每个环节都会向链条中的前一个环节请求信息,直到到达数据并next调用其观察者的方法。然后,该数据沿着链条向上回溯,沿途进行转换,直到到达最终的观察者。

以下是完整代码。

const observable = {
  subscribe: observer => {
    document.addEventListener("click", event => {
      observer.next(event.clientX)
    })
  },
  pipe: operator => {
    return operator(this)
  }
}

const observer = {
  next: x => {
    console.log(x)
  },
  error: err => {
    console.log(err)
  },
  complete: () => {
    console.log("done")
  }
}

const map = f => {
  return observable => {
    subscribe: observer => {
      observable.subscribe({
        next: x => {
          observer.next(f(x))
        },
        error: err => {
          console.error(err)
        },
        complete: () => {
          console.log("finished")
        }
      })
    },
    pipe: operator => {
      return operator(this)
    },
  }
}
Enter fullscreen mode Exit fullscreen mode
文章来源:https://dev.to/creeland/intro-to-rxjs-concepts-with-vanilla-javascript-4aji
PREV
如何在网站上查找子域名(以及为什么要这样做)
NEXT
如何制作一份助你求职的 Web 开发人员简历