如何在 Node.js 中使用 ZeroMQ 发布/订阅模式

2025-06-05

如何在 Node.js 中使用 ZeroMQ 发布/订阅模式

概述

发布/订阅是一种模式,其中发布者无需编程即可将消息(有效负载)发送给特定的接收者。这些消息由发布者发送到特定的频道,接收者可以订阅一个或多个频道来消费这些消息。

假设您有一个单体式后端,但您想为其添加一项新功能,例如发送电子邮件。您可以让该后端不再负责发送电子邮件,而是将其设置为发布者,将电子邮件发送到某个渠道,供另一个负责发送电子邮件(例如新闻通讯)的后端(接收者)使用。

今天的例子

这个过程的实现非常简单,这就是为什么在今天的例子中我决定创建一个简单的 Api,以便它将接收我们的请求主体并将其发送到特定通道以供接收器使用并记录它。

让我们开始编码

你可能已经了解了,我们将有两个后端。其中一个后端我们称之为server,它将作为我们的消息发送器。另一个后端称为worker,它将是我们的小型微服务。

首先,让我们安装依赖项:

npm install fastify zeromq --save
Enter fullscreen mode Exit fullscreen mode

现在让我们创建一个简单的 API:

// @/server.js
const Fastify = require("fastify");

const app = Fastify();

app.post("/", (request, reply) => {
  return reply.send({ ...request.body });
});

const main = async () => {
  try {
    await app.listen(3000);
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

现在我们可以导入 zeromq 并创建它的一个实例。然后,我们将创建 Publisher 类型的 ZeroMQ 套接字,并通过我们定义的地址接受连接。不过,这是异步的,必须在应用程序启动后立即执行。如下所示:

// @/server.js
const Fastify = require("fastify");
const zmq = require("zeromq");

const app = Fastify();
const sock = new zmq.Publisher();

app.post("/", async (request, reply) => {
  return reply.send({ ...request.body });
});

const main = async () => {
  try {
    await sock.bind("tcp://*:7890");
    await app.listen(3000);
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

现在,当从请求主体发送数据时,我们必须使用sock.send()函数。我们将传递一个数组参数,该数组需要两个元素。

第一个元素是我们要发送消息的频道,第二个元素是相应的消息。这样,我们将把响应主体中的数据作为消息发送,但首先我们必须将 JSON 转换为字符串。

// @/server.js
const Fastify = require("fastify");
const zmq = require("zeromq");

const app = Fastify();
const sock = new zmq.Publisher();

app.post("/", async (request, reply) => {
  await sock.send(["dev.to", JSON.stringify({ ...request.body })]);
  return reply.send("Sent to the subscriber/worker.");
});

const main = async () => {
  try {
    await sock.bind("tcp://*:7890");
    await app.listen(3000);
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

现在我们可以开始处理我们的工作进程了。现在让我们导入 zeromq 并创建它的一个实例。然后,我们将创建 Subscriber 类型的 ZeroMQ 套接字,并通过我们之前定义的地址接受连接。

// @/worker.js
const zmq = require("zeromq");

const sock = new zmq.Subscriber();

const main = async () => {
  try {
    sock.connect("tcp://localhost:7890");
    // ...
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

现在,我们已经创建了客户端实例并建立了连接,我们可以订阅我们的频道以从中接收消息。

// @/worker.js
const zmq = require("zeromq");

const sock = new zmq.Subscriber();

const main = async () => {
  try {
    sock.connect("tcp://localhost:7890");
    sock.subscribe("dev.to");
    // ...
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

接下来,我们创建一个 for 循环,以便记录在特定频道中发布的每条消息。我们需要从套接字中获取两个信息:主题(即消息来自的频道)和相应的消息。别忘了将字符串消息解析回 JSON。

// @/worker.js
const zmq = require("zeromq");

const sock = new zmq.Subscriber();

const main = async () => {
  try {
    sock.connect("tcp://localhost:7890");
    sock.subscribe("dev.to");
    for await (const [topic, msg] of sock) {
      console.log("Received message from " + topic + " channel and this is the content:");
      console.log(JSON.parse(msg));
    }
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

现在,当使用类似 Postman 的工具测试我们的 Api 时,您可以在请求正文中发送具有所需属性的 json 对象。

测试 API

然后你的终端上应该会出现类似这样的内容:

终端日志

结论

一如既往,希望你觉得这篇文章有趣。如果你发现本文有任何错误,请在评论区指出。🧑🏻‍💻

祝你度过美好的一天!🥸✌️

文章来源:https://dev.to/franciscomendes10866/how-to-use-zeromq-pub-sub-pattern-in-node-js-2i62
PREV
使用 Node.js 进行图像压缩
NEXT
如何使用 TypeORM 种子数据库让我们开始编码