如何在 NodeJS 中处理海量数据
dev.to 文章 - 如何在 NodeJS 中处理海量数据
前言
如果您像我一样,喜欢使用 NodeJS 来处理各种不同的任务,例如处理 HTTP 请求、开发 CLI 工具、物联网等等。您可能遇到过需要同时发送多个 HTTP 请求的情况,如果没有遇到过,也不用担心,因为总有一天您会遇到的。利用 JavaScript 的异步特性,大多数经验丰富的开发人员都能同时发送几个 HTTP 请求。但是,当您需要发送数百万个 HTTP 请求时会发生什么?这个问题甚至可能困扰经验丰富的 JavaScript 开发人员,因为它涉及到大多数人不常遇到的问题——处理大数据。
您可能已经猜到,如果您尝试异步发送 100 万个 HTTP 请求,那么您的程序将会崩溃,而且您的猜测是正确的。事实上,您的程序很可能在 100 万个 HTTP 请求之前就崩溃了。异步并不意味着它可以处理无限量的数据。在本文的其余部分,我希望向您展示如何以有效的方式处理任何大小的数据,而不会导致您耗尽系统资源。我们将使用NodeJS Streams,这是我们的秘密武器,所以如果您需要流指南,那么这是我最喜欢的文章。与那篇文章不同的是,我不打算深入研究流的工作原理,除非从高层次,相反,我的目标是为您提供一个使用流处理大数据的实际示例。
直接到完成的代码
如果您很着急或者不想阅读,那么这里是我们将要构建的已完成的 Github 存储库。
我们将要构建什么
- 我们将从文件中读取 Github 用户名列表
- 我们希望针对每个 Github 用户名调用 GitHub API 并获取其仓库列表。我们只处理 12 个用户名的较小列表,因为我不希望一大群读者滥用 Github API,而且无论数据量大小,这个概念都是一样的。
- 将这些数据写入我们的数据库,但为了避免此步骤的设置复杂性,我们只将数据写入文件。
- 最后,我们将进行重构以提高性能。
我们将使用 NodeJS Streams 完成所有这些操作,如果操作正确,它将具有背压的概念,这有助于我们以不会耗尽内存的方式使用 NodeJS 资源。
1. 读取 Github 用户名文件
您可以在存储库的示例文件中找到该文件
src/main.js
const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({})
let githubUsernames = ''
readGithubUsernamesStream
.pipe(csvParser)
.on('data', (data) => githubUsernames += data)
.on('end', () => console.log(githubUsernames))
// Outputs - itmayziii,dhershman1,HetaRZinzuvadia,joeswislocki,justinvoelkel,mandarm2593,mfrost503,wmontgomery,kentcdodds,gaearon,btholt,paulirish,ryanflorence
2. 从 Github 获取仓库列表
NodeJS 为我们提供了createReadStream来将我们的文件作为流读取,这很好,但现在我们需要自己的流来获取用户名列表,读取它,并将其转换为 github 结果。
在此步骤中,我们将使用axios发出 HTTP 请求
src/transform-username-to-github-repos.js
const axios = require('axios')
const stream = require('stream')
module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
constructor (options = {}) {
super({ ...options, objectMode: true })
}
_transform (chunk, encoding, callback) {
const username = chunk
this.getGithubRepositoriesForUser(username)
.then((response) => {
let repositories = []
if (response.data) {
repositories = response.data.map((repository) => repository.name)
}
this.push(JSON.stringify({
username,
repositories
}))
callback()
})
.catch(callback)
}
getGithubRepositoriesForUser (username) {
return axios.get(`https://api.github.com/users/${username}/repos`, {
headers: {
Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
}
})
}
}
并修改我们的src/main.js
const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
let githubUserRepositories = []
readGithubUsernamesStream
.pipe(csvParser)
.pipe(transformUsernameToGithubRepos)
.on('data', (data) => githubUserRepositories.push(data))
.on('end', () => console.log(githubUserRepositories))
我们在这里修改了很多东西,现在开始分解。我们创建了一个包含_transform
方法的转换流。当我们将 CSV 文件传输到这个转换流时,这个_transform
方法会被调用。调用该_tranform
方法并将用户名传递给它后,我们会获取用户名,并向 GitHub 请求该用户的所有仓库。然后,我们将结果发送到流中的下一个方法this.push(...)
。我们还没有 Steam 管道中的下一步,所以我们开始监听data
收集数据并登录 main.js 的事件。
3. 将用户存储库写入文件
src/main.js
const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))
let githubUserRepositories = []
readGithubUsernamesStream
.pipe(csvParser)
.pipe(transformUsernameToGithubRepos)
.pipe(writeStream)
.on('end', () => process.exit())
这是一个简单的步骤,我们只需创建一个写入流将内容写入 txt 文件。
4. 重构
虽然我们目前的做法有效,但远非理想。如果你仔细观察代码,就会发现它效率极低。
- 它每次只能处理一个 HTTP 请求,虽然我们无法同时处理 100 万个 HTTP 请求,但这并不意味着我们无法处理 100 个。在本例中,为了演示目的,我们将每个管道演练限制为 5 个。
- 代码的错误处理能力较差
让我们继续修复这些问题,首先从每个管道演练多个 HTTP 请求开始
src/transform-username-to-github-repos.js
const axios = require('axios')
const stream = require('stream')
module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
constructor (options = {}) {
super({ ...options, objectMode: true })
this.requests = []
}
_transform (chunk, encoding, callback) {
const username = chunk[0]
const githubRequest = this.getGithubRepositoriesForUser(username)
this.requests.push(this.prepareGithubRequest(username, githubRequest))
if (this.requests.length < 5) {
return callback()
}
this.processRequests(callback)
}
_flush (callback) {
this.processRequests(callback)
}
getGithubRepositoriesForUser (username) {
return axios.get(`https://api.github.com/users/${username}/repos`, {
headers: {
Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
}
})
}
prepareGithubRequest (username, githubRequest) {
return githubRequest
.then((response) => {
let repositories = []
if (response.data) {
repositories = response.data.map((repository) => repository.name)
}
return {
username,
repositories
}
})
}
processRequests (callback) {
return Promise.all(this.requests)
.then((responses) => {
this.requests = []
this.push(responses.reduce((accumulator, currentValue) => {
return accumulator + JSON.stringify(currentValue)
}, ''))
callback()
})
.catch(callback)
}
}
再次强调,我们做了很多工作,所以让我们回顾一下发生了什么。我们修改了_tranform
方法,调用 Github API,然后将 Promise 推送到一个数组中,如果累计的 Promise 总数小于 5,则继续执行。实际上,在告诉转换函数将数据推送到流中之前,我们调用了 Github 5 次,具体操作可以在方法中找到processRequests
。我们成功地将管道改为每次处理 5 个请求,而不是 1 个,这带来了巨大的性能提升。
想象一下,如果我们要处理 100 万条记录,而不是 5 条,而是 100 条,那么我们会同时发送 100 个 HTTP 请求,等待所有请求都解析完毕后再发送 100 个。这是一种非常高效且节省资源的处理大量数据的方法。
但我们还没有完成,我们仍然需要更好的错误处理,为此我们将利用原生 NodeJS管道功能。
管道 - 一种模块方法,用于在流之间传输转发错误并进行正确清理,并在管道完成时提供回调。
src/main.js
const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
const stream = require('stream')
const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))
stream.pipeline(
readGithubUsernamesStream,
csvParser,
transformUsernameToGithubRepos,
writeStream,
(error) => {
if (error) {
console.error('error ', error)
return process.exit(1)
}
process.exit()
}
)
结论
NodeJS 流使我们能够有效地构建一个管道,数据从某个点开始,流经该管道直至结束。使用背压(backpressuring),只需实现 NodeJS 内置的流即可,这样我们就能在处理海量数据时高效地利用计算机资源。我知道这类方法有效,因为我曾用它处理过来自 CSV 文件的超过 1000 万条记录,调用 API 获取更多数据,然后将结果存储在数据库中,就像我们在本文中所做的那样。流本身就很有效,但如果您真的想加快速度,我建议考虑将子进程与流结合使用,以实现最高效率。
封面照片来源 - Jonathan Kemper on unsplash
鏂囩珷鏉ユ簮锛�https://dev.to/itmayziii/how-to-process-epic-amounts-of-data-in-nodejs-16hl