Docker 上的微服务和 RabbitMQ

2025-06-07

Docker 上的微服务和 RabbitMQ

基于微服务的架构涉及将您的整体应用程序分解为多个、完全独立可部署和可扩展的服务。除了这个基本定义之外,微服务的构成可能有些主观,尽管NetflixUber等巨头采用了几种久经考验的做法,应该始终考虑这些做法。我将讨论其中的一些。最终,我们希望将我们的应用程序划分为更小的应用程序,每个应用程序都是一个独立的系统,只处理整个应用程序的一个方面,并且做得很好。这种分解是一个非常重要的一步,可以在子域的基础上完成,必须正确识别子域。较小的应用程序更加模块化和易于管理,边界定义明确,可以使用不同的语言/框架编写,可以隔离故障,这样整个应用程序就不会崩溃(没有 SPOF)。以电影售票为例:


来源:https ://codeburst.io/build-a-nodejs-cinema-api-gateway-and-deploying-it-to-docker-part-4-703c2b0dd269

让我们来分解一下这个鸟瞰图:

i) 用户应用程序可以是移动客户端、SPA 等,也可以是任何使用我们后端服务的客户端。

ii)要求客户端分别与我们的各项服务通信被认为是一种不好的做法,原因我已在此处解释。API 网关的作用就是接收客户端请求、调用我们的服务、返回响应。因此,客户端只需与一台服务器通信,给人一种整体式架构的感觉。不同类型的客户端(移动应用、平板电脑、浏览器等)可以使用多个网关。它们可以而且应该负责有限的功能,例如合并/加入来自服务的响应、身份验证、ACL。在需要动态扩展和移动的大型应用程序中,网关还需要访问服务注册表,该注册表保存着微服务实例、数据库等的位置。

iii)每个服务都有自己的存储。这是关键点,可确保松散耦合。某些查询需要连接多个服务拥有的数据。为了避免这种严重的性能损失,数据可能会被复制和分片。微服务不仅容忍这一原则,而且鼓励这种原则。

iv)对我们的 API 网关发出的 REST 调用会被传递给服务,这些服务又会与其他服务通信,并将结果返回给网关,网关可能会对其进行编译并将其响应给客户端。像这样在客户端向应用发出请求时,服务之间的通信不应该发生。否则,为了新引入的模块化,我们将因为又一次 HTTP 往返而牺牲性能。

理想情况下,单个请求应该只调用一个服务来获取响应。这意味着服务之间的同步请求应该最小化,但这并非总是可行;必要时,通常会使用gRPCThrift甚至简单的 HTTP(如我们的示例中所示)等机制。您可能已经猜到了,这意味着数据必须在我们的服务之间复制。比如,GET /catalog/<<cityId>>端点还应该返回该城市各家电影院当时的首映影片。按照我们的新策略,首映影片也必须存储在影院目录服务的数据库中。因此,要点是iii)

服务间异步通信

假设电影服务上的某些 CRUD 操作导致首映影片的上映时间发生变化。为了保持数据同步,必须触发该更新事件并将其应用于电影目录服务。不妨将我们的微服务想象成一个状态机集群,其中状态的更新可能需要在整个集群中传递以实现最终一致性。当然,我们不应该期望最终用户需要等待更长时间才能完成请求,从而牺牲他们的时间来获得模块化优势。因此,所有这些通信都必须是非阻塞的。而这正是RabbitMQ 的用武之地。

RabbitMQ 是一个非常强大的消息代理,它实现了AMQP 消息协议。其原理如下:首先,您需要在系统上安装一个 RabbitMQ 服务器实例(代理)。然后,发布者/生产者程序连接到该服务器并发送消息。RabbitMQ 会将该消息放入队列,并将其转发给正在 RabbitMQ 服务器上监听的一个或多个订阅者/消费者程序。

在进入正题之前,我想明确指出,微服务架构复杂得多。由于分布式系统的复杂性,我们不会讨论容错等关键主题,也不会讨论API 网关的全部作用服务发现、像Sagas这样的数据一致性模式、使用断路器防止服务故障级联、健康检查以及像CQRS这样的架构模式。更不用说如何判断微服务架构是否适合你了

RabbitMQ 的工作原理

更具体地说,消息会被发布到RabbitMQ 代理内部的交换机。然后,交换机会根据开发者定义的某些规则(称为绑定)将消息的副本分发到各个队列。消息传输的这一部分称为路由。这种间接路由正是实现非阻塞消息传输的关键所在。监听这些队列并收到消息的消费者最终会收到它。是不是很简单?

不完全是。有四种不同类型的交换机,每种交换机以及绑定都定义了一种路由算法。“路由算法”本质上是指消息如何在队列之间分配。在这里详细介绍每种类型可能有点过头,所以我只扩展一下我们将要使用的:主题交换机

要让交换机将消息推送到队列,该队列必须绑定到交换机。我们可以显式创建多个具有唯一名称的交换机。但是,部署 RabbitMQ 时,它会默认带有一个无名交换机。我们创建的每个队列都会自动绑定到这个交换机。为了便于描述,我将手动创建一个命名交换机,然后将一个队列绑定到它。此绑定由绑定键定义。绑定键的具体工作方式取决于交换机的类型。以下是它在主题交换机中的工作方式:

  • 使用字符串模式(绑定键)将队列绑定交换机
  • 发布的消息与路由密钥一起发送到交换机
  • 交换机根据之前定义绑定键模式检查哪些队列与路由键匹配

*可以替代一个单词。#可以替代零个或多个单词。

来源:https://www.rabbitmq.com/tutorials/tutorial-five-python.html

任何带有路由键的消息"quick.orange.rabbit"都会被投递到两个队列。但是,带有 的消息"lazy.brown.fox"只会到达Q2。路由键不匹配任何模式的消息将会丢失。

从某种角度来看,我们先来看一下另外两种交换类型:

  • 扇出型交换器 (Fanout Exchange):发送到此类交换器的消息将被发送到与其绑定的所有队列。如果提供了路由键 (Routing Key),则路由键将被完全忽略。例如,这可用于在分布式系统中广播全局配置更新。
  • 直接交换(最简单):将消息发送到绑定键与给定路由键完全相等的队列。如果有多个消费者正在监听该队列,则消息将在它们之间进行负载均衡,因此,它通常用于以循环方式在多个工作器之间分配任务。

我的示例非常简单:一个带有单个 POST 端点的 Python Flask应用,当它被调用时,会尝试更新用户信息,并向 RabbitMQ 代理发送一条消息(当然是非阻塞的)并返回 201 错误。一个单独的 Go 服务将监听来自代理的消息,因此有机会相应地更新其数据。这三个服务将托管在不同的容器中。

使用 Docker Compose 设置我们的容器化微服务和代理

配置一堆容器以及所有与之相关的内容可能会很麻烦,所以我总是依赖Docker Compose

以下是完整代码。我们声明了三个服务,分别用于三个容器。我们需要两个来将代码放入容器中:

# docker-compose.yml

version: "3.2"
services:
    rabbitmq-server:
        build: ./rabbitmq-server

    python-service:
        build: ./python-service
        # 'rabbitmq-server' will be available as a network reference inside this service 
        # and this service will start only after the RabbitMQ service does.
        depends_on:
            - rabbitmq-server
        # Keep it running.  
        tty: true
        # Map port 3000 on the host machine to port 3000 of the container.
        # This will be used to receive HTTP requests made to the service.
        ports:
            - "3000:3000"
        volumes:
            - './python-service:/python-service'

    go-service:
        build: ./go-service
        depends_on:
            - rabbitmq-server
        tty: true
        volumes:
            - './go-service:/go-service'

# Host volumes used to store code.
volumes:
    python-service:
    go-service:
Enter fullscreen mode Exit fullscreen mode

Dockerfiles 基本上是来自Docker Hub 的标准文件,我在其中添加了:

  • /go-serviceGo 服务容器中的工作目录。
  • /python-servicePython 服务容器中的工作目录。
  • Go 的 RabbitMQ 客户端库amqp
  • Python 的 RabbitMQ 客户端Pika & Flask

我们的 Flask 应用只有一个端点,用于接收 auser_id和 a full_name,用于更新用户的个人资料。之后,指示此更新的消息将发送到 RabbitMQ 代理。

# main.py

from flask import Flask
from flask import request
from flask import jsonify
from services.user_event_handler import emit_user_profile_update

app = Flask(__name__)

@app.route('/users/<int:user_id>', methods=['POST'])
def update(user_id):
    new_name = request.form['full_name']

    # Update the user in the datastore using a local transaction...

    emit_user_profile_update(user_id, {'full_name': new_name})

    return jsonify({'full_name': new_name}), 201
Enter fullscreen mode Exit fullscreen mode

向其他服务发送事件的逻辑应该始终与应用程序的其余部分分开,因此我将其提取到一个模块中。由于我们无法知道(也不应该依赖)哪个服务先启动,因此应该由发布者和消费者明确检查并创建交换。这是一个很好的做法,大多数 RabbitMQ 客户端库都无缝地实现了这一点:

# services/user_event_handler.py

import pika
import json

def emit_user_profile_update(user_id, new_data):
    # 'rabbitmq-server' is the network reference we have to the broker, 
    # thanks to Docker Compose.
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq-server'))
    channel    = connection.channel()

    exchange_name = 'user_updates'
    routing_key   = 'user.profile.update'

    # This will create the exchange if it doesn't already exist.
    channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)

    new_data['id'] = user_id

    channel.basic_publish(exchange=exchange_name,
                          routing_key=routing_key,
                          body=json.dumps(new_data),
                          # Delivery mode 2 makes the broker save the message to disk.
                          # This will ensure that the message be restored on reboot even  
                          # if RabbitMQ crashes before having forwarded the message.
                          properties=pika.BasicProperties(
                            delivery_mode = 2,
                        ))

    print("%r sent to exchange %r with data: %r" % (routing_key, exchange_name, new_data))
    connection.close()
Enter fullscreen mode Exit fullscreen mode

不要被混淆channel通道只是TCP 连接中的一个虚拟的轻量级连接 ,旨在避免打开多个开销巨大的 TCP 连接。尤其是在多线程环境中。

durable参数确保交换器持久化到磁盘,并在代理因任何原因崩溃或离线时恢复。发布者(Python 服务)创建一个名为 的交换器,并以路由键为参数,user_updates将用户的更新数据发送给它。这将与一个绑定键匹配,该绑定键由我们的 Go 服务定义:user.profile.updateuser.profile.*

// main.go

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        // Exit the program.
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    // 'rabbitmq-server' is the network reference we have to the broker, 
    // thanks to Docker Compose.
    conn, err := amqp.Dial("amqp://guest:guest@rabbitmq-server:5672/")
    failOnError(err, "Error connecting to the broker")
    // Make sure we close the connection whenever the program is about to exit.
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    // Make sure we close the channel whenever the program is about to exit.
    defer ch.Close()

    exchangeName := "user_updates"
    bindingKey   := "user.profile.*"

    // Create the exchange if it doesn't already exist.
    err = ch.ExchangeDeclare(
            exchangeName,   // name
            "topic",        // type
            true,           // durable
            false,
            false,
            false,
            nil,
    )
    failOnError(err, "Error creating the exchange")

    // Create the queue if it doesn't already exist.
    // This does not need to be done in the publisher because the
    // queue is only relevant to the consumer, which subscribes to it.
    // Like the exchange, let's make it durable (saved to disk) too.
    q, err := ch.QueueDeclare(
            "",    // name - empty means a random, unique name will be assigned
            true,  // durable
            false, // delete when the last consumer unsubscribes
            false, 
            false, 
            nil,   
    )
    failOnError(err, "Error creating the queue")

    // Bind the queue to the exchange based on a string pattern (binding key).
    err = ch.QueueBind(
            q.Name,       // queue name
            bindingKey,   // binding key
            exchangeName, // exchange
            false,
            nil,
    )
    failOnError(err, "Error binding the queue")

    // Subscribe to the queue.
    msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer id - empty means a random, unique id will be assigned
            false,  // auto acknowledgement of message delivery
            false,  
            false,  
            false,  
            nil,
    )
    failOnError(err, "Failed to register as a consumer")


    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received message: %s", d.Body)

            // Update the user's data on the service's 
            // associated datastore using a local transaction...

            // The 'false' indicates the success of a single delivery, 'true' would
            // mean that this delivery and all prior unacknowledged deliveries on this
            // channel will be acknowledged, which I find no reason for in this example.
            d.Ack(false)
        }
    }()

    fmt.Println("Service listening for events...")

    // Block until 'forever' receives a value, which will never happen.
    <-forever
}
Enter fullscreen mode Exit fullscreen mode

RabbitMQ 默认使用 5672 端口进行非 TLS 连接,并使用“guest”作为用户名和密码。您可以了解丰富的配置选项,以及如何在PikaGo amqp中使用它们。

您可能想知道这一行是做什么用的:d.Ack(false)

这告诉代理该消息已送达、已成功处理并且可以删除。默认情况下,这些确认会自动发生。但我们在订阅队列时另有说明ch.Consume

现在,如果 Go 服务崩溃(由于任何不可预见的原因),则不会发送确认,这将导致代理重新排队该消息,以便可以给予它另一个处理的机会。

启动微服务

好吧,让我们开始吧:

跑步docker-compose up

当这三个服务构建完成后(第一次至少需要几分钟),使用以下命令检查它们的名称docker ps

打开两个新终端,使用相应的容器名称通过 SSH 进入 Python 和 Go 容器并启动服务器:

docker exec -it microservicesusingrabbitmq_python-service_1 bash
FLASK_APP=main.py python -m flask run --port 3000 --host 0.0.0.0

docker exec -it microservicesusingrabbitmq_go-service_1 bash
go run main.go

打开第三个终端发送 POST 请求。我将使用 Curl:

curl -d "full_name=usama" -X POST http://localhost:3000/users/1

你会看到传输:

在任何时候,您也可以通过 SSH 进入 RabbitMQ 容器并查看:

  • rabbitmqctl list_exchanges(列出此代理节点上的所有交易所)
  • rabbitmqctl list_queues(列出此代理节点上的所有队列)
  • rabbitmqctl list_bindings(列出此代理节点上的所有绑定)
  • rabbitmqctl list_queues name messages_ready messages_unacknowledged(列出所有队列以及每个队列中已准备好发送给客户端但尚未发送的消息数以及已发送但尚未确认的消息数)

正如我在开头提到的,这绝不是对微服务的深入探讨。有很多问题需要提出,我将尝试回答一个重要的问题:如何使这种通信具有事务性?那么,如果我们的 Go 服务(消费者)在更新其自身状态时抛出异常,而我们需要确保更新事件在所有受其影响的服务中回滚,会发生什么情况?想象一下,当我们拥有多个微服务和数千个这样的“更新事件”时,这会变得多么复杂。本质上,我们需要合并执行回滚的单独事件

在我们的例子中,如果 Go 服务在更新数据时抛出异常,它必须向 Python 服务发送一条消息,告知其回滚更新。同样需要注意的是,在发生此类错误时,必须确认消息已送达(即使处理未成功),以免消息被代理重新加入队列。在编写消费者时,我们必须决定哪些错误意味着消息应该重新加入队列(重试),哪些错误意味着消息不应该重新加入队列而应该回滚。

但是,我们如何指定要回滚的更新事件,以及回滚究竟是如何发生的呢?Saga 模式以及事件溯源被广泛用于确保这种数据一致性。

关于设计经纪商的几点说明

考虑两件事:要使用的交换类型以及如何对交换进行分组

如果您需要向系统中的所有服务广播某些类型的消息,请查看扇出交换器类型。然后,一种分组交换器的方法可能基于事件,例如三个名为user.profile.updateduser.profile.deleted、的扇出交换器。这可能并非您始终想要的,因为您可能会有太多交换器,并且如果不创建新的交换器队列绑定user.profile.added,您将无法为特定消费者过滤消息

另一种方法是根据系统中的实体创建主题交换器。因此,在我们的第一个示例中, 、user可以是实体,并且绑定到 的队列可以使用绑定键,例如(创建用户时获取消息)、(用户刚登录时获取消息)、(获取用户已获得授权角色的消息)、(向用户发送通知)等。moviecinemauseruser.createduser.loginuser.roles.grantuser.notify

始终使用路由来过滤消息并将其传递给特定的消费者。编写代码来丢弃某些传入消息而接受其他消息是一种反模式。消费者应该只接收他们需要的消息。

最后,如果您的需求很复杂,并且您需要根据多个属性将消息过滤给某些消费者,请使用标头交换

享受!

文章来源:https://dev.to/usamaashraf/microservices--rabbitmq-on-docker-e2f
PREV
编写一个交互式(并且令人满意的)游标:7 个简单步骤 + 2kb 代码
NEXT
停止重复工作并开始编写您的快速 API 🚀