使用 NodeJS 中的多线程同时上传多个文件
NodeJS 以单线程而闻名,但事实并非如此,因为只有事件循环由单线程处理。NodeJS 为我们提供了两种实现多线程的方法,worker_threads
和child_process
。
worker_threads
由一个进程控制并具有共享内存,这使得它们之间的通信更加容易。child_process
是从主线程生成的进程,当我们需要与操作系统直接通信时它很有用,但我们需要更多的内存来创建它们。
应用程序:我们要创建的应用程序的目的是加载所有文件夹的内容并将每个文件上传到 Google 云端存储,但最有趣的部分是我们将决定有多少个线程应该执行此操作,从而加快上传过程。
主要原则:
- 工作线程:用于创建线程的 NodeJS 模块。
- 流:如果您不了解 NodeJS 流,我建议您查看我在此处创建的教程。
- 文件系统:NodeJS 为我们提供了一种访问操作系统和操作文件和文件夹的简单方法。
重现步骤:
- 加载文件夹的内容
- 创建线程
- 创建上传工作器
- 分配上传线程
要求:
-
我们将使用 NodeJS
16.16
版本 -
您需要拥有一个 Google 帐户才能访问 Google 云服务。
云存储服务
我们需要安装云存储库来处理谷歌云服务。
npm install @google-cloud/storage
如果您需要帮助在您的 Google 帐户上配置您的云存储服务,许多优秀的教程可以为您提供帮助,但这不是本教程的目的。
让我们创建第一个文件cloudStorageFileService.js
来处理我们的存储。
cloudStorageFileService.js
const { Storage } = require('@google-cloud/storage')
const path = require('path')
const serviceKey = path.join(__dirname, '../gkeys.json')
class CloudStorageFileService {
// (1)
constructor() {
this.storage = new Storage({
projectId: 'my-project-id',
keyFilename: serviceKey
})
}
// (2)
async uploadFile(bucketName, destFileName) {
return await this.storage
.bucket(bucketName)
.file(destFileName)
.createWriteStream()
}
}
module.exports = CloudStorageFileService
从上面的代码部分:
-
使用云存储服务的基本配置,作为项目 ID 和带有您的 Google Cloud 凭据的路径。
-
Google Cloud Storage 为我们提供了用于上传文件的可写流。
线程控制器
线程控制器将处理线程分配,我们希望为每个文件提供一个线程,并分别上传它们。
threadController.js
const {
Worker
} = require('node:worker_threads');
const { readdir } = require('fs/promises')
const path = require('path')
class ThreadController {
// (1)
constructor(threadsNumber) {
this.files = []
this.threadsNumber = threadsNumber
this.count = 0
}
// (2)
async loadFiles() {
this.files = await readdir(path.join(__dirname, '/content'))
}
// (3)
async uploadThread(filePath) {
return new Promise((resolve, reject) => {
const worker = new Worker('./fileUploadWorker.js', {
workerData: {
file: filePath
}
});
worker.once('error', reject);
worker.on('exit', (code) => {
resolve(filePath)
});
})
}
// (4)
async execute() {
const init = performance.now()
await this.loadFiles()
let promises = []
while (this.count < this.files.length) {
for (let i = this.count; i < this.count + this.threadsNumber; i++) {
if (this.files[i]) {
promises.push(this.uploadThread(this.files[i]))
}
}
const result = await Promise.all(promises)
promises = []
this.count += this.threadsNumber
console.log(result)
}
const end = performance.now()
console.log(end - init)
}
}
module.exports = ThreadController
从上面的代码部分:
-
初始化我们的三个主要参数,我们想要的线程数、我们想要上传的文件以及创建线程的计数器。
-
将包含的所有文件加载到我们要处理的文件夹中。
-
这里我们向工作线程发送一条带有正确文件路径的消息,并等待该线程使用
Worker
对象完成其进程。 -
将所有文件一起运行。现在我们为每个文件分配一个线程,直到没有文件需要处理为止。例如,如果有 5 个文件,我们分配 3 个线程进行处理,那么第一次运行会处理前 3 个文件,第二次运行会处理剩下的 2 个文件。此外,我还设置了一个性能计,用于测试不同线程数下的性能表现。
文件上传工作者
上传工作器是线程的代码表示,在这里我们将放置我们想要线程执行的所有操作。
fileUploadWorker.js
const {
isMainThread, parentPort, workerData
} = require('node:worker_threads');
const path = require('path')
const { pipeline } = require('stream/promises')
const { createReadStream } = require('fs')
const CloudStorageFileService = require('./cloudStorageFileService');
class FileUploadWorker {
// (1)
constructor() {
this.storage = new CloudStorageFileService()
this.filePath = path.join(__dirname, '/content/', workerData.file)
this.fileName = workerData.file
}
// (2)
async upload() {
if (!isMainThread) {
await pipeline(createReadStream(this.filePath), await this.storage.uploadFile('myfileuploads', this.fileName))
}
}
}
// (3)
;
(async () => {
const fileUploader = new FileUploadWorker()
await fileUploader.upload()
})()
从上面的代码部分:
-
在构造函数中,我们需要初始化存储服务,或者将其作为参数接收。此外,我们还需要通过 从父线程获取文件路径
workerData
。 -
这里我们检查当前是在动态创建的线程中,还是在 NodeJS 主线程中。如果不是在主线程中,我们就从文件创建一个可读流 (Readable Stream) 对象并上传。
-
这个匿名函数负责执行我们创建的线程。
执行一切
为了测试我们的应用程序,我将设置 9 个线程,每个线程对应一个文件夹。您可以尝试其他值来测试性能。
fileUploadWorker.js
const ThreadController = require('./threadController');
const controller = new ThreadController(9)
;
(async () => {
await controller.execute()
})()
总结
- NodeJS 不是单线程的。
- 当您需要处理繁重的工作并且不想使 NodeJS 主线程崩溃时,线程非常方便。
- 我们还可以使用多线程来执行批处理作业。
您可以在此处查看完整代码
链接:https://dev.to/wesleymreng7/uploading-multiple-files-at-the-same-time-using-multithreading-in-nodejs-3ib4