在 NodeJS 中处理 CPU 密集型作业
封面照片由Fidel Fernando在Unsplash上拍摄
最近,我得解决我的一台 NodeJS 服务器的一个问题。我需要解析并处理一个 Excel 表格中的数据,这非常耗费 CPU 资源,并且阻塞了 NodeJS 的事件循环,导致服务器卡死,直到处理完成。不过我并不意外;我预料到了这种情况。虽然有很多方法可以解决这个问题,但我打算使用 NodeJS 的 worker_threads 模块来解决这个问题。在开始之前,我们先快速了解一下线程相关的知识。本文使用的代码可以在这里找到。
NodeJS 和线程
线程是进程内的执行路径。
JavaScript 是一种单线程编程语言,这意味着一次只能执行一组指令。NodeJS 应用程序实际上并非单线程,但我们无法像 Java 那样创建线程。NodeJS 会并行运行某些任务(例如 I/O 操作),但其他 CPU 操作则在一个线程上运行。
这对我们意味着什么?
如果您收到的所有请求仅需要 I/O 密集型操作(如数据库读取、写入等),那么您的服务器将正常工作。但是,如果您碰巧有一个请求需要 CPU 密集型操作,例如解析文档或运行非常长的循环(就像我在处理 Excel 表时所做的那样),您的服务器将无法处理其他请求,因为唯一处理请求的线程将被卡住。
什么是“worker_threads”模块?
worker_threads 模块支持使用并行执行 JavaScript 的线程。
这使我们能够构建多线程 NodeJS 应用程序,这正是我们现在所需要的。
好的...那么,我们要如何使用它来解决我们的问题呢?
让我们暂时假装自己是高级软件工程师,并开始编写某种规范!
规格
其理念是将 CPU 密集型任务分配给另一个线程。任务一收到,就会立即被存储在队列中等待处理。工作池(一组工作线程)会定期从该队列中请求任务进行处理。任务完成后,主线程会收到通知,结果会存储在数据库中。您可以对结果进行任何操作,但在本例中,我指示系统向创建该任务的用户发送一封包含结果链接的电子邮件。
如果进程结束时队列中仍有作业,会发生什么情况?
好吧,队列也应该保存到数据库中。当应用启动时,它应该从数据库中加载所有未完成的作业,并将它们加入队列进行处理。我们会将所有请求在放入队列之前保存到数据库中,这样队列中的所有作业也会保存在数据库中。
当工作线程由于某种原因而停止并且作业未完成时会发生什么?
我们可以在工作线程上设置退出事件处理程序。这意味着,如果我们跟踪每个线程正在执行的操作,我们就能发现某个工作线程留下了未完成的任务,并尝试将其重新分配给其他工作线程!一个 Map 就足以保存我们的任务分配。每个工作线程都需要一个唯一的 ID,我们可以将其用作 Map 的键。uuid包将为我们的工作者提供唯一的 ID。
如果处理作业时发生错误会发生什么?
工作状态字段应该考虑到这一点。我建议使用以下状态:pending
、和。您还可以创建一个包含有关工作的有用信息的字段。processing
completed
failed
message
现在我们知道该做什么了。闲话少叙,开始写代码吧!
这里我会使用 Typescript,但 JavaScript 的对应代码应该差别不大。我也经常使用 Observable,但它并没有什么特别之处。我不太清楚 Observable 到底是什么,但对我来说(以及这里使用它的方式),它只是一种触发事件并监听事件的机制。
作业处理器
这家伙的工作非常简单。
- 接受一项工作,将其保存到数据库然后将其排入队列。
async registerJob(job: any) {
// save job to DB before it is added to queue
const _id = await services.Job.create({
...job,
status: 'pending'
});
this.queue.enqueue({ ...job, _id });
}
- 初始化工作池并监听来自池的消息。
- 当工作人员请求工作时,从队列中取出一个任务并将其传递给该工作人员。将任务分配存储在 Map 中,并将任务状态更新到
processing
数据库中。 - 当一个工人宣布一项工作完成时,更新数据库、分配图并为其寻找另一项工作。
async processJobs() {
const workers = new WorkerPool(this.nWorkers);
workers.init();
workers.on('message', async ({ id, message, status, data }) => {
if (message === WorkerMessage.job_complete) {
const job = this.assignedJobs.get(id);
this.assignedJobs.set(id, null);
// update job status
services.Job.updateOne(
{ status, data },
{ _id: job._id }
);
}
const newJob: any = await this.queue.dequeue();
workers.send(id, newJob);
this.assignedJobs.set(id, newJob);
// update job status
services.Job.updateOne(
{ status: 'processing' },
{ _id: newJob._id }
);
});
workers.on('exit', (id) => {
const ongoingJob = this.assignedJobs.get(id);
if (!ongoingJob) return;
// Re-queue the job that wasn't finished
this.queue.enqueue(ongoingJob);
});
}
队列
这里也没有什么特别的,只是一个异步队列的实现,客户端可以等待直到有新项目。
// ... taken from Queue.ts
enqueue(item: T) {
this.items.push(item);
this.observable.push(QueueEvents.enqueue);
}
async dequeue() {
if (this.items.length > 0) {
const currentItem = this.items[0];
this.items = this.items.filter((_, index) => index !== 0);
this.observable.push(QueueEvents.dequeue);
return currentItem;
}
return new Promise((resolve) => {
const unsubscribe = this.observable.subscribe(async (message) => {
if (message !== QueueEvents.enqueue) return;
resolve(await this.dequeue());
unsubscribe();
});
})
}
}
工人池
初始化所需数量的工人,为他们分配 ID 并管理客户端和工人之间的通信。
// ... taken from WorkerPool.ts
private createWorker() {
const worker = new Worker(`${__dirname}/worker.js`);
const id = v4();
this.workers.set(id, worker);
worker.on("message", (value) => {
this.observable.push({
event: "message",
data: { id, ...value }
});
});
worker.on("exit", () => {
this.observable.push({ event: "exit" });
this.workers.delete(id);
// Create another worker to replace the closing worker
this.createWorker();
})
}
send(id: string, data: any) {
const worker = this.workers.get(id);
worker?.postMessage(data);
}
on(evt: string, handler: Function) {
this.observable.subscribe((value) => {
const { event, data } = value;
if (evt === event) {
handler(data);
}
});
}
}
工人
这个进程负责处理 CPU 密集型作业。它启动时会发出任务请求。一旦收到任务,它就会立即处理,并通知主进程该任务已完成并返回结果。无需再次发送任务请求,因为主进程会自动尝试寻找其他任务执行。
import { workerData, parentPort } from "worker_threads";
import { WorkerMessage } from "./WorkerMessage";
parentPort.on('message', async (job) => {
const { data } = job;
try {
// process job here
parentPort.postMessage({
message: WorkerMessage.job_complete,
status: 'completed',
data: { ...data, resultId }
});
} catch (error) {
parentPort.postMessage({
message: WorkerMessage.job_complete,
status: 'failed',
data: { ...data, error: error.message }
});
}
});
parentPort.postMessage({ message: WorkerMessage.request_job });
启动时
剩下的就是调用Job Processor
。
import { jobProcessor } from "./JobProcessor";
jobProcessor.loadOutstandingJobs();
jobProcessor.processJobs();
结论
代码量很大,但大部分都很简单。我喜欢这些代码简洁易用,我们设法解决了问题,但这个解决方案远非完美。我想到了一些你可能需要其他方案的场景。
-
您的工作可能占用过多的 CPU 资源,因此您需要将每个请求分配给多个工作人员。
-
你可能设置了很多服务器实例,但你不希望每个实例都运行各自的工作线程。或许你需要一个中央工作线程池供所有服务器实例使用。
-
…我还能想到更多……如果你还有更多,请在评论中留言
另外,如果您认为我遗漏了什么,或者我的工作存在缺陷,请在评论中告诉我。完整代码可在GitHub上查看。
谢谢你!
文章来源:https://dev.to/eunovo/processing-cpu-intense-jobs-in-nodejs-5epd