面向 Nodejs 开发人员的 RabbitMQ 简介
先决条件
对于本教程,你需要了解 Node.js 的一些背景知识,并在你的机器上安装 docker
队列
在计算机科学中,有一个“队列”的概念。队列是一组消息,旨在从一个发送者传递到一个或多个接收者。消息可以根据设计按序或无序传递。处理这些事务的计算机程序称为消息代理。RabbitMQ 是最流行的消息代理之一,它运行在高级消息队列协议 (AMQP) 之上。AMQP 协议由四个主要组件组成:发布者、交换器、队列和消费者。
出版商
消息由发布者发布到交换机,发布者还负责设置消息的属性,我们稍后会介绍。
交易所
交换器负责将消息路由到一个或多个队列,稍后我们将介绍队列。rabbitmq 中有 4 种不同类型的交换器。
1.
直接 2.扇出
3.主题
4.标题
在本教程中,我们将仅介绍两种:直接,稍后我将在 Fanout 交换上做另一个教程。
直接交换器 (Direct Exchange) 负责根据路由键 (Routing Key) 将消息路由到队列。声明队列时,可以使用路由键将其“绑定”到交换器 (Exchange),我们将在后面讨论此主题。直接队列适用于在工作器 (Worker) 之间分配任务。 扇出交换器 (Fanout Exchange) 会将消息发送到所有通过路由键绑定到该交换器的队列。当消息到达时,交换器会将该消息的副本发送到所有队列。扇出交换器 (Fanout Exchange) 适用于将消息广播到分布式系统中的多个节点。
队列
队列负责存储消息并将其传递给消费者。使用队列之前,需要先声明它。队列需要绑定到交换机才能开始接收消息。绑定是交换机用来将消息路由到一个或多个队列的一组规则。
消费者
消费者是拼图的最后一块,他们需要订阅队列以便开始接收消息,当消费者接收并处理消息时,它需要“确认”该消息才能获取另一条消息。
安装 rabbitMQ
我们将使用 docker 来安装 rabbitmq 及其管理 UI。
docker run --rm -it --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
上述命令将安装 RabbitMQ,并将两个端口绑定到您的本地端口:5672 和 15672。
您可以使用 15672 端口访问 RabbitMQ 管理门户:http://localhost:15672,默认用户名和密码为 guest/guest。
我们需要在端口 5672 上使用 amqplib 库与 RabbitMQ 服务器通信。现在,让我们创建一个直接交换器和一个队列。
const rabbit = require('amqplib');
const QUEUE_NAME = 'square';
const EXCHANGE_TYPE = 'direct';
const EXCHANGE_NAME = 'main';
const KEY = 'myKey';
const number = '5'
connection = rabbit.connect('amqp://localhost');
connection.then(async (conn)=>{
const channel = await conn.createChannel();
await channel.assertExchange(EXCHANGE_NAME, EXCHANGE_TYPE);
await channel.assertQueue(QUEUE_NAME);
channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, KEY);
})
发生了很多事,让我们来分析一下
在第 1 行,我导入了 amqplib 库;然后在第 7 行,我创建了一个与 RabbitMQ 的连接
;第 9 行,我在连接中创建了一个通道。你需要先创建通道,然后才能开始与 RabbitMQ 交互。在第 10 行,我使用 assertExchage 方法创建了一个交换器。此方法接受两个参数:交换器名称和交换器类型;
在第 11 行,我使用 assertQueue 方法创建了一个名为 square 的队列。在第 12 行,我使用 bindQueue 方法将 main 绑定到 square,路由键为 myKey。
运行上述代码后,您可以导航到管理界面并单击“队列”选项卡,您将在队列列表下看到方块
让我们编写一个应用程序来计算数字的平方
const rabbit = require('amqplib');
const QUEUE_NAME = 'square';
const EXCHANGE_TYPE = 'direct';
const EXCHANGE_NAME = 'main';
const KEY = 'myKey';
const number = '5'
connection = rabbit.connect('amqp://localhost');
connection.then(async (conn)=>{
const channel = await conn.createChannel();
await channel.assertExchange(EXCHANGE_NAME, EXCHANGE_TYPE);
await channel.assertQueue(QUEUE_NAME);
channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, KEY);
channel.sendToQueue(QUEUE_NAME, Buffer.from(number))
})
我们可以使用 sendToQueue 方法向 Square 队列发送消息。sendToQueue 方法接受两个参数:队列名称和要发送的内容。唯一需要注意的是,内容必须为缓冲区格式。
现在,运行上述代码后,您可以从管理界面导航到您的队列,您将看到方形队列中有一条消息
让我们消化这条信息并找到正方形
const rabbit = require('amqplib');
const QUEUE_NAME = 'square';
connection = rabbit.connect('amqp://localhost');
connection.then(async (conn)=>{
const channel = await conn.createChannel();
channel.consume(QUEUE_NAME, (m)=>{
const number = parseInt(m.content.toString())
const square = number * number
console.log(square)
channel.ack(m)
})
})
它与我们的发布者在很多方面非常相似。我们需要打开一个连接并创建一个通道,然后使用通道对象暴露的消费方法。消费方法接受两个参数:队列名称和回调函数。每当有消息发布到 Square 队列时,都会调用这个回调函数。回调函数接受一个参数,即消息对象。我们可以在 content 属性下找到对应的数字。如果您还记得的话,发布消息时,我们必须将数字转换为缓冲区数据,因此当我们消费内容时,必须将其转换回数字。首先,我们需要将数字从缓冲区转换为字符串,然后再从字符串转换为数字。获得数字后,我们可以找到 Square 并将其控制台打印到屏幕上。最后,我们可以使用 ack 方法确认该消息,并告诉 RabbitMQ 从队列中移除该消息,并发送下一条消息(如果有)。因此,首先运行发布者,然后运行消费者,以查看效果。您也可以打开管理控制台并查看队列上的活动。
接下来让我们运行两个消费者并稍微修改一下我们的发布者
const rabbit = require('amqplib');
const QUEUE_NAME = 'square';
const EXCHANGE_TYPE = 'direct';
const EXCHANGE_NAME = 'main';
const KEY = 'myKey';
const numbers = ['1', '2', '3', '4', '5']
connection = rabbit.connect('amqp://localhost');
connection.then(async (conn)=>{
const channel = await conn.createChannel();
await channel.assertExchange(EXCHANGE_NAME, EXCHANGE_TYPE);
await channel.assertQueue(QUEUE_NAME);
channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, KEY);
numbers.forEach((number)=>{
channel.sendToQueue(QUEUE_NAME, Buffer.from(number))
})
})
现在我们有一个数字数组,而不是一个数字,我们使用 forEach 方法遍历该数组,并将数字逐个发送到队列。现在运行你的消费者,你会看到消费者自动消费了所有消息,并显示数字的平方。
结论
RabbitMQ 是一款流行的消息代理,它运行在 AMPQ 协议之上。AMPQ 协议由 4 个组件组成:1-发布者 (Publisher)、2-交换器 (Exchange)、3-队列 (Queue)、4-消费者 (Consumer)。
为了与 rabbitmq 通信,我们需要打开一个连接并在连接内部创建一个通道。然后我们可以将消息发布到队列或从一个或多个队列中使用消息。
鏂囩珷鏉ユ簮锛�https://dev.to/pharzad/introduction-to-rabbitmq-for-node-js-developers-1clm