如何在 NodeJS 中处理海量数据 dev.to 文章 - 如何在 NodeJS 中处理海量数据

2025-06-09

如何在 NodeJS 中处理海量数据

dev.to 文章 - 如何在 NodeJS 中处理海量数据

前言

如果您像我一样,喜欢使用 NodeJS 来处理各种不同的任务,例如处理 HTTP 请求、开发 CLI 工具、物联网等等。您可能遇到过需要同时发送多个 HTTP 请求的情况,如果没有遇到过,也不用担心,因为总有一天您会遇到的。利用 JavaScript 的异步特性,大多数经验丰富的开发人员都能同时发送几个 HTTP 请求。但是,当您需要发送数百万个 HTTP 请求时会发生什么?这个问题甚至可能困扰经验丰富的 JavaScript 开发人员,因为它涉及到大多数人不常遇到的问题——处理大数据。

您可能已经猜到,如果您尝试异步发送 100 万个 HTTP 请求,那么您的程序将会崩溃,而且您的猜测是正确的。事实上,您的程序很可能在 100 万个 HTTP 请求之前就崩溃了。异步并不意味着它可以处理无限量的数据。在本文的其余部分,我希望向您展示如何以有效的方式处理任何大小的数据,而不会导致您耗尽系统资源。我们将使用NodeJS Streams,这是我们的秘密武器,所以如果您需要流指南,那么这是我最喜欢的文章。与那篇文章不同的是,我不打算深入研究流的工作原理,除非从高层次,相反,我的目标是为您提供一个使用流处理大数据的实际示例。

直接到完成的代码

如果您很着急或者不想阅读,那么这里是我们将要构建的已完成的 Github 存储库。

dev.to 文章 - 如何在 NodeJS 中处理海量数据




我们将要构建什么

  1. 我们将从文件中读取 Github 用户名列表
  2. 我们希望针对每个 Github 用户名调用 GitHub API 并获取其仓库列表。我们只处理 12 个用户名的较小列表,因为我不希望一大群读者滥用 Github API,而且无论数据量大小,这个概念都是一样的。
  3. 将这些数据写入我们的数据库,但为了避免此步骤的设置复杂性,我们只将数据写入文件。
  4. 最后,我们将进行重构以提高性能。

我们将使用 NodeJS Streams 完成所有这些操作,如果操作正确,它将具有背压的概念,这有助于我们以不会耗尽内存的方式使用 NodeJS 资源。

1. 读取 Github 用户名文件

您可以在存储库的示例文件中找到该文件

src/main.js

const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')

const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({})

let githubUsernames = ''
readGithubUsernamesStream
  .pipe(csvParser)
  .on('data', (data) => githubUsernames += data)
  .on('end', () => console.log(githubUsernames))

// Outputs - itmayziii,dhershman1,HetaRZinzuvadia,joeswislocki,justinvoelkel,mandarm2593,mfrost503,wmontgomery,kentcdodds,gaearon,btholt,paulirish,ryanflorence

Enter fullscreen mode Exit fullscreen mode

2. 从 Github 获取仓库列表

NodeJS 为我们提供了createReadStream来将我们的文件作为流读取,这很好,但现在我们需要自己的流来获取用户名列表,读取它,并将其转换为 github 结果。

在此步骤中,我们将使用axios发出 HTTP 请求

src/transform-username-to-github-repos.js

const axios = require('axios')
const stream = require('stream')

module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
  constructor (options = {}) {
    super({ ...options, objectMode: true })
  }

  _transform (chunk, encoding, callback) {
    const username = chunk
    this.getGithubRepositoriesForUser(username)
      .then((response) => {
        let repositories = []
        if (response.data) {
          repositories = response.data.map((repository) => repository.name)
        }

        this.push(JSON.stringify({
          username,
          repositories
        }))
        callback()
      })
      .catch(callback)
  }

  getGithubRepositoriesForUser (username) {
    return axios.get(`https://api.github.com/users/${username}/repos`, {
      headers: {
        Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
      }
    })
  }
}

Enter fullscreen mode Exit fullscreen mode

并修改我们的src/main.js

const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')

const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()

let githubUserRepositories = []
readGithubUsernamesStream
  .pipe(csvParser)
  .pipe(transformUsernameToGithubRepos)
  .on('data', (data) => githubUserRepositories.push(data))
  .on('end', () => console.log(githubUserRepositories))

Enter fullscreen mode Exit fullscreen mode

我们在这里修改了很多东西,现在开始分解。我们创建了一个包含_transform方法的转换流。当我们将 CSV 文件传输到这个转换流时,这个_transform方法会被调用。调用该_tranform方法并将用户名传递给它后,我们会获取用户名,并向 GitHub 请求该用户的所有仓库。然后,我们将结果发送到流中的下一个方法this.push(...)。我们还没有 Steam 管道中的下一步,所以我们开始监听data收集数据并登录 main.js 的事件。

3. 将用户存储库写入文件

src/main.js

const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')

const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))

let githubUserRepositories = []
readGithubUsernamesStream
  .pipe(csvParser)
  .pipe(transformUsernameToGithubRepos)
  .pipe(writeStream)
  .on('end', () => process.exit())

Enter fullscreen mode Exit fullscreen mode

这是一个简单的步骤,我们只需创建一个写入流将内容写入 txt 文件。

4. 重构

虽然我们目前的做法有效,但远非理想。如果你仔细观察代码,就会发现它效率极低。

  • 它每次只能处理一个 HTTP 请求,虽然我们无法同时处理 100 万个 HTTP 请求,但这并不意味着我们无法处理 100 个。在本例中,为了演示目的,我们将每个管道演练限制为 5 个。
  • 代码的错误处理能力较差

让我们继续修复这些问题,首先从每个管道演练多个 HTTP 请求开始

src/transform-username-to-github-repos.js

const axios = require('axios')
const stream = require('stream')

module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
  constructor (options = {}) {
    super({ ...options, objectMode: true })
    this.requests = []
  }

  _transform (chunk, encoding, callback) {
    const username = chunk[0]
    const githubRequest = this.getGithubRepositoriesForUser(username)
    this.requests.push(this.prepareGithubRequest(username, githubRequest))
    if (this.requests.length < 5) {
      return callback()
    }

    this.processRequests(callback)
  }

  _flush (callback) {
    this.processRequests(callback)
  }

  getGithubRepositoriesForUser (username) {
    return axios.get(`https://api.github.com/users/${username}/repos`, {
      headers: {
        Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
      }
    })
  }

  prepareGithubRequest (username, githubRequest) {
    return githubRequest
      .then((response) => {
        let repositories = []
        if (response.data) {
          repositories = response.data.map((repository) => repository.name)
        }

        return {
          username,
          repositories
        }
      })
  }

  processRequests (callback) {
    return Promise.all(this.requests)
      .then((responses) => {
        this.requests = []

        this.push(responses.reduce((accumulator, currentValue) => {
          return accumulator + JSON.stringify(currentValue)
        }, ''))
        callback()
      })
      .catch(callback)
  }
}
Enter fullscreen mode Exit fullscreen mode

再次强调,我们做了很多工作,所以让我们回顾一下发生了什么。我们修改了_tranform方法,调用 Github API,然后将 Promise 推送到一个数组中,如果累计的 Promise 总数小于 5,则继续执行。实际上,在告诉转换函数将数据推送到流中之前,我们调用了 Github 5 次,具体操作可以在方法中找到processRequests。我们成功地将管道改为每次处理 5 个请求,而不是 1 个,这带来了巨大的性能提升。

想象一下,如果我们要处理 100 万条记录,而不是 5 条,而是 100 条,那么我们会同时发送 100 个 HTTP 请求,等待所有请求都解析完毕后再发送 100 个。这是一种非常高效且节省资源的处理大量数据的方法。

但我们还没有完成,我们仍然需要更好的错误处理,为此我们将利用原生 NodeJS管道功能。

管道 - 一种模块方法,用于在流之间传输转发错误并进行正确清理,并在管道完成时提供回调。

src/main.js

const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
const stream = require('stream')

const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))

stream.pipeline(
  readGithubUsernamesStream,
  csvParser,
  transformUsernameToGithubRepos,
  writeStream,
  (error) => {
    if (error) {
      console.error('error ', error)
      return process.exit(1)
    }

    process.exit()
  }
)
Enter fullscreen mode Exit fullscreen mode

结论

NodeJS 流使我们能够有效地构建一个管道,数据从某个点开始,流经该管道直至结束。使用背压(backpressuring),只需实现 NodeJS 内置的流即可,这样我们就能在处理海量数据时高效地利用计算机资源。我知道这类方法有效,因为我曾用它处理过来自 CSV 文件的超过 1000 万条记录,调用 API 获取更多数据,然后将结果存储在数据库中,就像我们在本文中所做的那样。流本身就很有效,但如果您真的想加快速度,我建议考虑将子进程与流结合使用,以实现最高效率。

封面照片来源 - Jonathan Kemper on unsplash

鏂囩珷鏉ユ簮锛�https://dev.to/itmayziii/how-to-process-epic-amounts-of-data-in-nodejs-16hl
PREV
如何学习 JavaScript 中的闭包并了解何时使用它们
NEXT
每个开发者都应该收藏的 10 个开源项目