使用 Vanilla JavaScript 介绍 RxJS 概念
最近, egghead.io和Andre Staltz一起主持的一场网络研讨会给了我启发,我想分享一下我的学习心得。在那场网络研讨会之前,我对 RxJS 并不熟悉,而且那是我第一次接触观察者模式。在深入分析之前,观察者模式对我来说就像魔法一样。
JavaScript 有多个使用回调函数的 API,它们都执行几乎相同的操作,但略有不同。
流
stream.on('data', data => {
   console.log(data)
})
stream.on('end', () => {
   console.log("Finished")
})
stream.on('error', err => {
   console.error(err)
})
承诺
somePromise()
  .then(data => console.log(data))
  .catch(err => console.error(err))
事件监听器
document.addEventListener('click', event => {
  console.log(event.clientX)
})
你看到的大致模式是,有一个对象,对象内部有一个接受函数(换句话说,回调)的方法。它们都在解决同一个问题,只是方式不同,这导致你需要费力记住每个 API 的具体语法。这就是 RxJS 的用武之地。RxJS 将所有这些统一在一个通用的抽象之下。
那么,可观察对象究竟是什么?它是一种抽象,就像数组、函数或对象都是抽象一样。Promise 可以解析(resolve)或拒绝(reject),并返回一个值。可观察对象能够随时间发出值。你可以消费来自服务器的数据流,或者监听 DOM 事件。
💀 可观察的骨架
const observable = {
  subscribe: observer => {
  },
  pipe: operator => {
  },
}
可观察对象只是包含subscribe和pipe方法的对象。等等,这到底是怎么回事?什么是观察者,什么是操作符?观察者只是包含next、error和 的回调方法的对象complete。该subscribe方法会消费观察者并将值传递给它。因此,可观察对象充当生产者,而观察者是它的消费者。
👀 观察者
const observer = {
  next: x => {
    console.log(x)
  },
  error: err => {
    console.log(err)
  },
  complete: () => {
    console.log("done")
  }
}
在该方法中,subscribe您将某种形式的数据传递给观察者的方法。
订阅方法
const observable = {
  subscribe: observer => {
    document.addEventListener("click", event => {
      observer.next(event.clientX)
    })
  },
  pipe: operator => {
  },
}
这里我们只是监听文档中任意位置的点击。如果我们运行这段代码并调用observable.subscribe(observer),我们就会在控制台中看到点击的 x 坐标。那么这个pipe方法呢?pipe它使用一个运算符并返回一个函数,然后使用可观察对象调用返回的函数。
管道方法
const observable = {
  subscribe: observer => {
    document.addEventListener("click", event => {
      observer.next(event.clientX)
    })
  },
  pipe: operator => {
    return operator(this)
  },
}
很酷,但什么是运算符呢?运算符用于转换数据。数组有运算符,例如map. map,它让你退一步,对数组中的所有内容运行某个函数。你可以有一个数组,然后还有一个数组,它是第一个数组的映射版本。
让我们map为可观察对象编写一个函数。
🗺️ 地图操作符
const map = f => {
  return observable => {
    subscribe: observer => {
      observable.subscribe({
        next: x => {
          observer.next(f(x))
        },
        error: err => {
          console.error(err)
        },
        complete: () => {
          console.log("finished")
        }
      })
    },
    pipe: operator => {
      return operator(this)
    },
  }
}
这里发生了很多事情,让我们来分析一下。
const map = f => {
  return observable => {
这里我们传入一个函数,并返回一个需要可观察变量的函数。还记得我们的pipe方法吗?
pipe: operator => {
  return operator(this)
},
要在可观察对象上运行运算符,需要将其传递到pipe。pipe将把它调用的可观察对象传递到我们的运算符返回的函数中。
subscribe: observer => {
  observable.subscribe({
接下来,我们定义subscribe要返回的可观察对象的方法。该方法需要一个观察者,将来在返回的可观察对象上调用该方法时(可以通过其他操作符或显式调用)会接收该方法。然后,使用观察者对象.subscribe进行调用。observable.subscribe
{
  next: x => {
    observer.next(f(x))
  },
  error: err => {
    console.error(err)
  },
  complete: () => {
    console.log("finished")
  }
}
在观察者的方法中,你可以看到,我们通过最初传入的函数和一个传入的值,调用了next未来的观察者。让我们在可观察对象上运行我们的 new 运算符!nextmapxnextmap
observable
  .pipe(map(e => e.clientX))
  .pipe(map(x => x - 1000))
  .subscribe(observer)
必须使用final 修饰符subscribe,否则这些操作符中的任何操作都不会执行,因为它们都被包装在了观察者的subscribe方法中。这些subscribe方法会调用subscribe链中的前一个观察者,但观察者链必须从某个地方开始。
那么让我们来观察一下运行时会发生什么。
- 第一个管道在可观察对象上被调用,map并被柯里化this
- map被调用- e => e.clientX并返回一个函数
- 该函数使用原始函数调用observable,并返回一个可观察值- 我们将其称为 observable2
 
- pipe被调用- observable2并- map咖喱- this
- map被调用- x => x - 1000并返回一个函数
- 该函数被调用observable2并返回一个可观察对象- 我们将其称为 observable3
 
- .subscribe- observable3被传入观察者调用
- .subscribe- observable2被操作员的观察者传入调用
- .subscribe在原始可观察对象上调用,并传入操作符的观察者
- 点击事件发生clientX时100
- observer2.next(100)被叫
- observer3.next(100)被叫
- observer.next(-900)被调用并记录- -900到控制台。
- 完毕!
您可以看到这里发生的链条。当您调用时subscribe,您正在请求信息,每个环节都会向链条中的前一个环节请求信息,直到到达数据并next调用其观察者的方法。然后,该数据沿着链条向上回溯,沿途进行转换,直到到达最终的观察者。
以下是完整代码。
const observable = {
  subscribe: observer => {
    document.addEventListener("click", event => {
      observer.next(event.clientX)
    })
  },
  pipe: operator => {
    return operator(this)
  }
}
const observer = {
  next: x => {
    console.log(x)
  },
  error: err => {
    console.log(err)
  },
  complete: () => {
    console.log("done")
  }
}
const map = f => {
  return observable => {
    subscribe: observer => {
      observable.subscribe({
        next: x => {
          observer.next(f(x))
        },
        error: err => {
          console.error(err)
        },
        complete: () => {
          console.log("finished")
        }
      })
    },
    pipe: operator => {
      return operator(this)
    },
  }
}
 后端开发教程 - Java、Spring Boot 实战 - msg200.com
            后端开发教程 - Java、Spring Boot 实战 - msg200.com