Node 工作者用例
(或者提高 Node 服务器性能的方法)
作者:Godwin Ekuma✏️
过去,Node.js 往往不适合构建需要CPU密集型计算的应用程序。这是因为它采用非阻塞、事件驱动的 I/O 架构。随着 Node.js 中线程工作线程的出现,它现在能够用于 CPU 密集型应用程序。在本文中,我们将探讨 Node.js 应用程序中工作线程的一些用例。
在继续讨论 Node.js 中线程工作者的用例之前,让我们快速比较一下 Node 中的 I/O 密集型和 CPU 密集型。
Node.js 中的 I/O 密集型与 CPU 密集型
I/O 限制
如果增加某种资源可以提高程序的性能,则称该程序受该资源约束。提高 I/O 子系统(例如内存、硬盘速度或网络连接)的速度可以提高 I/O 约束程序的性能。这在 Node.js 应用程序中很常见,因为事件循环通常会花费时间等待网络、文件系统甚至数据库 I/O 完成操作,然后才能继续执行代码或返回响应。提高硬盘速度和/或网络连接通常可以提高应用程序或程序的整体性能。
CPU 密集型
如果某个程序的处理时间会随着 CPU 频率的增加而减少,则该程序属于 CPU 密集型程序。例如,一个计算文件哈希值的程序在 2.2GHz 处理器上处理速度会更快,而在 1.2GHz 处理器上处理速度会更慢。
对于 CPU 密集型应用程序来说,大部分时间都花在 CPU 计算上。在 Node.js 中,CPU 密集型应用程序会阻塞事件,并导致其他请求被搁置。
Node.js 黄金法则
不要阻塞事件循环,保持其运行并避免任何可能阻塞线程的同步网络调用或无限循环。
Node 运行在单线程事件循环中,使用非阻塞 I/O 调用,这使得它能够同时支持数万个计算任务,例如处理多个传入的 HTTP 请求。只要每个客户端在特定时刻的计算任务量较小,这种机制就能很好地运行,并且速度很快。但是,如果执行 CPU 密集型计算,您的并发 Node.js 服务器就会突然停止运行。由于每次只处理一个请求,其他传入请求将处于等待状态。
Node.js 中已经采用了一些策略来处理 CPU 密集型任务。例如,多个进程(例如cluster API)可以确保 CPU 得到最佳利用,而子进程则会生成新的进程来处理阻塞任务。
这些策略的优势在于事件循环不会被阻塞,并且允许进程分离,因此如果一个进程出现问题,不会影响其他进程。然而,由于子进程彼此隔离运行,它们无法共享内存,并且数据通信必须通过 JSON 进行,这需要对数据进行序列化和反序列化。
Node.js 中 CPU 密集型计算的最佳解决方案是在同一个进程中运行多个 Node.js 实例,这样可以共享内存,并且无需通过 JSON 传递数据。这正是 Node.js 中的工作线程所做的事情。
现实世界中 CPU 密集型任务可以通过线程工作者完成
我们将在 Node.js 应用程序中探讨线程工作器的一些用例。我们不会讨论线程工作器 API,因为我们只讨论 Node 应用程序中线程工作器的一些用例。如果您不熟悉线程工作器,可以访问这篇文章,了解如何使用线程工作器 API。
图像调整大小
假设您正在构建一个应用程序,允许用户上传个人资料图片,然后为应用程序中的各种用例生成多种尺寸(例如:100 x 100 和 64 x 64)的图片。调整图片大小的过程会占用大量 CPU 资源,而调整为两种不同的尺寸也会增加 CPU 调整图片大小所花费的时间。您可以将调整图片大小的任务外包给一个单独的线程,让主线程处理其他轻量级任务。
// worker.js
const { parentPort, workerData } = require("worker_threads");
const sharp = require("sharp");
async function resize() {
const outputPath = "public/images/" + Date.now() + ".png";
const { image, size } = workerData;
await sharp(image)
.resize(size, size, { fit: "cover" })
.toFile(outputPath);
parentPort.postMessage(outputPath);
}
resize()
// mainThread.js
const { Worker } = require("worker_threads");
module.exports = function imageResizer(image, size) {
return new Promise((resolve, reject) => {
const worker = new Worker(__dirname + "/worker.js", {
workerData: { image, size }
});
worker.on("message", resolve);
worker.on("error", reject);
worker.on("exit", code => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
});
});
};
主线程有一个方法,用于创建一个线程来调整每个图像的大小。它使用属性将大小和图像传递给线程工作器。工作器使用sharpworkerData
调整图像大小并将其发送回主线程。
视频压缩
视频压缩是另一项 CPU 密集型任务,可以外包给线程工作器 (Thread Worker)。大多数视频流应用程序通常会包含单个视频的多个版本,并根据用户的网络连接情况向用户显示。线程工作器可以完成将视频压缩成各种大小的任务。
ffmpeg-fluet
是 Node.js 应用程序中常用的视频处理模块。它依赖于ffmpeg
一个完整的跨平台解决方案,用于录制、转换和流式传输音频和视频。
由于每次需要使用新线程时创建工作线程会产生开销,因此建议您创建一个工作线程池,以便在需要时使用,而不是动态创建工作线程。为了创建工作线程池,我们使用 NPM 模块node-worker-threads-pool
,它使用 Node 的 worker_threads 模块创建工作线程池。
// worker.js
const { parentPort, workerData } = require("worker_threads");
const ffmpeg = require("fluent-ffmpeg");
function resizeVideo({ inputPath, size, parentPort }) {
const outputPath = "public/videos/" + Date.now() + size + ".mp4";
ffmpeg(inputPath)
.audioCodec("libmp3lame")
.videoCodec("libx264")
.size(size)
.on("error", function(err) {
console.log("An error occurred: " + err.message);
})
.on("end", function() {
parentPort.postMessage(outputPath);
})
.save(outputPath);
}
parentPort.on("message", param => {
resizeVideo({ ...param, parentPort });
});
// mainThread.js
const { StaticPool } = require("node-worker-threads-pool");
const filePath = __dirname + "/worker.js";
const pool = new StaticPool({
size: 4,
task: filePath,
workerData: "workerData!"
});
const videoSizes = ["1920x1080", "1280x720", "854x480", "640x360"];
module.exports = async function compressVideo(inputPath) {
const compressedVideos = [];
videoSizes.forEach(async size => {
const video = await pool.exec({ inputPath, size });
compressedVideos.push(video);
});
};
文件完整性
假设你需要将文件存储在云存储上。你需要确保存储的文件不被任何第三方篡改。为此,你可以使用加密哈希算法计算文件的哈希值。将这些哈希值及其存储位置保存在数据库中。下载文件时,需要再次计算哈希值以查看它们是否匹配。计算哈希值的过程需要占用大量 CPU 资源,可以在一个线程工作器中完成:
// hashing.js
const {
Worker, isMainThread, parentPort, workerData
} = require('worker_threads');
const crypto = require("crypto");
const fs = require("fs");
if (isMainThread) {
module.exports = async function hashFile(filePath) {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename);
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
});
worker.postMessage(filePath)
});
};
} else {
const algorithm = "sha1";
const shasum = crypto.createHash(algorithm);
const stream = fs.ReadStream(filePath);
stream.on("data", function(data) {
shasum.update(data);
});
stream.on("end", function() {
const hash = shasum.digest("hex");
parentPort.postMessage(hash);
});
}
请注意,我们将工作线程代码和主线程代码放在同一个文件中。thread worker 属性帮助我们确定当前线程并运行适合每个线程的代码。主线程创建一个新的 worker 并监听来自该 worker 的事件。工作线程使用名为 的 Node.js加密方法isMainThread
计算数据流的哈希值。createHash
结论
当我们想要通过释放事件循环来提升性能时,Node.js 线程工作线程是一个不错的选择。需要注意的是,工作线程对于执行 CPU 密集型 JavaScript 操作非常有用。不要将它们用于 I/O 操作,因为 Node.js 内置的异步执行机制已经比工作线程更高效地处理 I/O 操作。
编者注:觉得这篇文章有什么问题?您可以在这里找到正确版本。
插件:LogRocket,一个用于 Web 应用的 DVR
LogRocket是一款前端日志工具,可让您重播问题,就像它们发生在您自己的浏览器中一样。您无需猜测错误发生的原因,也无需要求用户提供屏幕截图和日志转储,LogRocket 允许您重播会话以快速了解问题所在。它可与任何应用程序完美兼容,无论使用哪种框架,并且提供插件来记录来自 Redux、Vuex 和 @ngrx/store 的更多上下文。
除了记录 Redux 操作和状态之外,LogRocket 还记录控制台日志、JavaScript 错误、堆栈跟踪、带有标头 + 正文的网络请求/响应、浏览器元数据以及自定义日志。它还会对 DOM 进行插桩,以记录页面上的 HTML 和 CSS,即使是最复杂的单页应用程序,也能重现像素完美的视频。
免费试用。
Node 工作者用例一文首先出现在LogRocket 博客上。
鏂囩珷鏉ユ簮锛�https://dev.to/bnevilleoneill/use-cases-for-node-workers-o1n