在 NodeJS 中处理 CPU 密集型作业

2025-05-28

在 NodeJS 中处理 CPU 密集型作业

封面照片由Fidel FernandoUnsplash上拍摄

最近,我得解决我的一台 NodeJS 服务器的一个问题。我需要解析并处理一个 Excel 表格中的数据,这非常耗费 CPU 资源,并且阻塞了 NodeJS 的事件循环,导致服务器卡死,直到处理完成。不过我并不意外;我预料到了这种情况。虽然有很多方法可以解决这个问题,但我打算使用 NodeJS 的 worker_threads 模块来解决这个问题。在开始之前,我们先快速了解一下线程相关的知识。本文使用的代码可以在这里找到。

NodeJS 和线程

线程是进程内的执行路径。

来源geeksforgeeks

JavaScript 是一种单线程编程语言,这意味着一次只能执行一组指令。NodeJS 应用程序实际上并非单线程,但我们无法像 Java 那样创建线程。NodeJS 会并行运行某些任务(例如 I/O 操作),但其他 CPU 操作则在一个线程上运行。

这对我们意味着什么?

如果您收到的所有请求仅需要 I/O 密集型操作(如数据库读取、写入等),那么您的服务器将正常工作。但是,如果您碰巧有一个请求需要 CPU 密集型操作,例如解析文档或运行非常长的循环(就像我在处理 Excel 表时所做的那样),您的服务器将无法处理其他请求,因为唯一处理请求的线程将被卡住。

什么是“worker_threads”模块?

worker_threads 模块支持使用并行执行 JavaScript 的线程。

来源NodeJS v14 文档

这使我们能够构建多线程 NodeJS 应用程序,这正是我们现在所需要的。

好的...那么,我们要如何使用它来解决我们的问题呢?

让我们暂时假装自己是高级软件工程师,并开始编写某种规范!

规格

其理念是将 CPU 密集型任务分配给另一个线程。任务一收到,就会立即被存储在队列中等待处理。工作池(一组工作线程)会定期从该队列中请求任务进行处理。任务完成后,主线程会收到通知,结果会存储在数据库中。您可以对结果进行任何操作,但在本例中,我指示系统向创建该任务的用户发送一封包含结果链接的电子邮件。

如果进程结束时队列中仍有作业,会发生什么情况?

好吧,队列也应该保存到数据库中。当应用启动时,它应该从数据库中加载所有未完成的作业,并将它们加入队列进行处理。我们会将所有请求在放入队列之前保存到数据库中,这样队列中的所有作业也会保存在数据库中。

当工作线程由于某种原因而停止并且作业未完成时会发生什么?

我们可以在工作线程上设置退出事件处理程序。这意味着,如果我们跟踪每个线程正在执行的操作,我们就能发现某个工作线程留下了未完成的任务,并尝试将其重新分配给其他工作线程!一个 Map 就足以保存我们的任务分配。每个工作线程都需要一个唯一的 ID,我们可以将其用作 Map 的键。uuid将为我们的工作者提供唯一的 ID。

如果处理作业时发生错误会发生什么?

工作状态字段应该考虑到这一点。我建议使用以下状态:pending。您还可以创建一个包含有关工作的有用信息的字段。processingcompletedfailedmessage

现在我们知道该做什么了。闲话少叙,开始写代码吧!

这里我会使用 Typescript,但 JavaScript 的对应代码应该差别不大。我也经常使用 Observable,但它并没有什么特别之处。我不太清楚 Observable 到底是什么,但对我来说(以及这里使用它的方式),它只是一种触发事件并监听事件的机制。

帮助我们更好地理解设计的图像

作业处理器

这家伙的工作非常简单。

  • 接受一项工作,将其保存到数据库然后将其排入队列。
    async registerJob(job: any) {
        // save job to DB before it is added to queue
        const _id = await services.Job.create({
            ...job,
            status: 'pending'
        });
        this.queue.enqueue({ ...job, _id });
    }
Enter fullscreen mode Exit fullscreen mode
  • 初始化工作池并监听来自池的消息。
  • 当工作人员请求工作时,从队列中取出一个任务并将其传递给该工作人员。将任务分配存储在 Map 中,并将任务状态更新到processing数据库中。
  • 当一个工人宣布一项工作完成时,更新数据库、分配图并为其寻找另一项工作。
async processJobs() {
        const workers = new WorkerPool(this.nWorkers);
        workers.init();

        workers.on('message', async ({ id, message, status, data }) => {
            if (message === WorkerMessage.job_complete) {
                const job = this.assignedJobs.get(id);
                this.assignedJobs.set(id, null);
                // update job status
                services.Job.updateOne(
                    { status, data },
                    { _id: job._id }
                );
            }

            const newJob: any = await this.queue.dequeue();
            workers.send(id, newJob);
            this.assignedJobs.set(id, newJob);
            // update job status
            services.Job.updateOne(
                { status: 'processing' },
                { _id: newJob._id }
            );
        });

        workers.on('exit', (id) => {
            const ongoingJob = this.assignedJobs.get(id);
            if (!ongoingJob) return;
            // Re-queue the job that wasn't finished
            this.queue.enqueue(ongoingJob);
        });
    }
Enter fullscreen mode Exit fullscreen mode

队列

这里也没有什么特别的,只是一个异步队列的实现,客户端可以等待直到有新项目。

// ... taken from Queue.ts
    enqueue(item: T) {
        this.items.push(item);
        this.observable.push(QueueEvents.enqueue);
    }

    async dequeue() {
        if (this.items.length > 0) {
            const currentItem = this.items[0];
            this.items = this.items.filter((_, index) => index !== 0);
            this.observable.push(QueueEvents.dequeue);
            return currentItem;
        }

        return new Promise((resolve) => {
            const unsubscribe = this.observable.subscribe(async (message) => {
                if (message !== QueueEvents.enqueue) return;
                resolve(await this.dequeue());
                unsubscribe();
            });
        })
    }
}
Enter fullscreen mode Exit fullscreen mode

工人池

初始化所需数量的工人,为他们分配 ID 并管理客户端和工人之间的通信。

// ... taken from WorkerPool.ts
    private createWorker() {
        const worker = new Worker(`${__dirname}/worker.js`);
        const id = v4();
        this.workers.set(id, worker);
        worker.on("message", (value) => {
            this.observable.push({
                event: "message",
                data: { id, ...value }
            });
        });
        worker.on("exit", () => {
            this.observable.push({ event: "exit" });
            this.workers.delete(id);
            // Create another worker to replace the closing worker
            this.createWorker();
        })
    }

    send(id: string, data: any) {
        const worker = this.workers.get(id);
        worker?.postMessage(data);
    }

    on(evt: string, handler: Function) {
        this.observable.subscribe((value) => {
            const { event, data } = value;
            if (evt === event) {
                handler(data);
            }
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

工人

这个进程负责处理 CPU 密集型作业。它启动时会发出任务请求。一旦收到任务,它就会立即处理,并通知主进程该任务已完成并返回结果。无需再次发送任务请求,因为主进程会自动尝试寻找其他任务执行。

import { workerData, parentPort } from "worker_threads";
import { WorkerMessage } from "./WorkerMessage";

parentPort.on('message', async (job) => {
    const { data } = job;
    try {
        // process job here
        parentPort.postMessage({
            message: WorkerMessage.job_complete,
            status: 'completed',
            data: { ...data, resultId }
        });
    } catch (error) {
        parentPort.postMessage({
            message: WorkerMessage.job_complete,
            status: 'failed',
            data: { ...data, error: error.message }
        });
    }
});

parentPort.postMessage({ message: WorkerMessage.request_job });
Enter fullscreen mode Exit fullscreen mode

启动时

剩下的就是调用Job Processor

import { jobProcessor } from "./JobProcessor";

jobProcessor.loadOutstandingJobs();
jobProcessor.processJobs();
Enter fullscreen mode Exit fullscreen mode

结论

代码量很大,但大部分都很简单。我喜欢这些代码简洁易用,我们设法解决了问题,但这个解决方案远非完美。我想到了一些你可能需要其他方案的场景。

  • 您的工作可能占用过多的 CPU 资源,因此您需要将每个请求分配给多个工作人员。

  • 你可能设置了很多服务器实例,但你不希望每个实例都运行各自的工作线程。或许你需要一个中央工作线程池供所有服务器实例使用。

  • …我还能想到更多……如果你还有更多,请在评论中留言

另外,如果您认为我遗漏了什么,或者我的工作存在缺陷,请在评论中告诉我。完整代码可在GitHub上查看。

谢谢你!

文章来源:https://dev.to/eunovo/processing-cpu-intense-jobs-in-nodejs-5epd
PREV
使用 ExpressoTS 的第一个项目
NEXT
终极 Vue.js(2021)备忘单