Go、Kafka 和 gRPC 清洁架构 CQRS 微服务与 Jaeger 跟踪👋🧑‍💻

2025-06-04

Go、Kafka 和 gRPC 清洁架构 CQRS 微服务与 Jaeger 跟踪👋🧑‍💻

在本文中,让我们尝试使用以下方法创建更接近现实世界的具有跟踪和监控功能的 CQRS 微服务:🚀
Kafka作为消息代理
gRPC gRPC 的 Go 实现
PostgreSQL作为数据库
Jaeger开源、端到端分布式跟踪
Prometheus监控和警报
Grafana用于使用 Prometheus
MongoDB Web 和基于 API 的 SMTP 测试
Redis用于 Golang
swag 的类型安全 Redis 客户端Swagger 用于 Go
Echo Web 框架

源代码可以在GitHub 仓库

中找到 。本文主要思想是使用 Go、Kafka 和 gRPC 实现 CQRS。
我不会尝试写什么是 CQRS 模式,因为这会让文章篇幅过长,最好的阅读地点是microservices.io。我发现这篇文章非常有趣,并以Three Dots Labs
的 CQRS 项目和博客作为起点

在这个例子中,我们有三个服务:API 网关、读写服务,它们通过 Kafka 和 gRPC 进行通信。
写入数据库使用 Postgres,读取数据库使用 MongoDB,缓存使用 Redis。
与任何实际项目一样,我们当然需要指标和跟踪,这里我们使用 Prometheus 和 Grafana 来获取指标,并使用 Jaeger 来跟踪。
由于时间有限,这个例子中没有实现任何有趣的业务逻辑,也没有涉及测试。

UI 界面将在以下端口上可用:

Jaeger 用户界面:http://localhost:16686

Prometheus 用户界面:http://localhost:9090

Grafana 用户界面:http://localhost:3000

Swagger 用户界面:http://localhost:5001/swagger/index.html

Jaeger 追踪用户界面

Swagger用户界面

Prometheus 指标用户界面

Grafana指标用户界面

对于本地开发:



make local or docker_dev // for run docker compose files
make migrate_up // run sql migrations
make mongo // run mongodb sripts
make swagger // generate swagger documentation


Enter fullscreen mode Exit fullscreen mode

要在 docker 中运行所有内容,您可以运行make docker_dev,它具有热重加载功能。

该项目的 Docker Compose 文件:



version: "3.8"

services:
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

  microservices_postgesql:
    image: postgres:13-alpine
    container_name: microservices_postgesql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=products
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  redis:
    image: redis:6-alpine
    restart: always
    container_name: microservices_redis
    ports:
      - "6379:6379"
    networks: [ "microservices" ]

  zoo1:
    image: zookeeper:3.4.9
    restart: always
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog
    networks: [ "microservices" ]

  kafka1:
    image: confluentinc/cp-kafka:5.5.1
    restart: always
    hostname: kafka1
    ports:
      - "9092:9092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1
    networks: [ "microservices" ]

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data_container:/data/db
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

volumes:
  mongodb_data_container:

networks:
  microservices:
    name: microservices


Enter fullscreen mode Exit fullscreen mode

API 网关服务的理念是接受 http 请求,命令处理程序将事件发布到 Kafka,并通过 gRPC 查询处理程序从读取器服务检索数据。
在本项目中使用了Echo,但发现 Gin 和 Chi 也非常适合生产环境。

让我们看一下代码:创建产品 http 处理程序,接受并验证请求体,然后生成 UUID 并调用创建产品命令。
关于将 ID 传递给命令或服务方法(作为单独的参数或在请求体中传递),或在命令中生成 ID 并返回,以及
命令是否返回值等等,存在许多争论,这些争论毫无意义。选择哪种方式取决于你和你的团队,最好把时间花在更重要的业务任务上 :)



// CreateProduct
// @Tags Products
// @Summary Create product
// @Description Create new product item
// @Accept json
// @Produce json
// @Success 201 {object} dto.CreateProductResponseDto
// @Router /products [post]
func (h *productsHandlers) CreateProduct() echo.HandlerFunc {
    return func(c echo.Context) error {
        h.metrics.CreateProductHttpRequests.Inc()

        ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.CreateProduct")
        defer span.Finish()

        createDto := &dto.CreateProductDto{}
        if err := c.Bind(createDto); err != nil {
            h.log.WarnMsg("Bind", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        createDto.ProductID = uuid.NewV4()
        if err := h.v.StructCtx(ctx, createDto); err != nil {
            h.log.WarnMsg("validate", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        if err := h.ps.Commands.CreateProduct.Handle(ctx, commands.NewCreateProductCommand(createDto)); err != nil {
            h.log.WarnMsg("CreateProduct", err)
            h.metrics.ErrorHttpRequests.Inc()
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.metrics.SuccessHttpRequests.Inc()
        return c.JSON(http.StatusCreated, dto.CreateProductResponseDto{ProductID: createDto.ProductID})
    }
}


Enter fullscreen mode Exit fullscreen mode

创建产品命令处理程序很简单,只需整理命令数据并发布到 Kafka 即可。Kafka
接受 []byte 作为值,这里使用了 proto。为了在整个 Kafka 中传递跟踪信息,我们必须使用标头,
在跟踪实用程序中可以找到相关的帮助程序。Go
有一些不错的 Kafka 库,我喜欢segmentsio_kafka-go



func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    createDto := &kafkaMessages.ProductCreate{
        ProductID:   command.CreateDto.ProductID.String(),
        Name:        command.CreateDto.Name,
        Description: command.CreateDto.Description,
        Price:       command.CreateDto.Price,
    }

    dtoBytes, err := proto.Marshal(createDto)
    if err != nil {
        return err
    }

    return c.kafkaProducer.PublishMessage(ctx, kafka.Message{
        Topic:   c.cfg.KafkaTopics.ProductCreate.TopicName,
        Value:   dtoBytes,
        Time:    time.Now().UTC(),
        Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
    })
}


Enter fullscreen mode Exit fullscreen mode

对于使用 Kafka 来说,最好有 UI 客户端进行调试,个人喜欢使用Conductor

写入服务使用 Kafka 主题,处理写入 Postgres 的消息,并将成功处理的消息发布到 Kafka。
在我看来,在 Go 中使用 Postgres 的最佳选择是pgx,但如果你需要查询构建器Squirrel
是一个非常好的库 。我个人不喜欢 ORM,但通常情况下,正如我所见,团队经常使用Gorm,这取决于你。ProcessMessages
方法监听 Kafka 主题并根据主题调用特定方法:



func (s *productMessageProcessor) ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int) {
    defer wg.Done()

    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        m, err := r.FetchMessage(ctx)
        if err != nil {
            s.log.Warnf("workerID: %v, err: %v", workerID, err)
            continue
        }

        s.logProcessMessage(m, workerID)

        switch m.Topic {
        case s.cfg.KafkaTopics.ProductCreate.TopicName:
            s.processCreateProduct(ctx, r, m)
        case s.cfg.KafkaTopics.ProductUpdate.TopicName:
            s.processUpdateProduct(ctx, r, m)
        case s.cfg.KafkaTopics.ProductDelete.TopicName:
            s.processDeleteProduct(ctx, r, m)
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

Kafka 消息处理方法反序列化和验证消息体,将其传递给命令并提交,
在这里我们必须选择处理错误的方式,但这取决于业务逻辑,例如
我们可以使用死信队列模式



func (s *productMessageProcessor) processCreateProduct(ctx context.Context, r *kafka.Reader, m kafka.Message) {
    s.metrics.CreateProductKafkaMessages.Inc()

    ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "productMessageProcessor.processCreateProduct")
    defer span.Finish()

    var msg kafkaMessages.ProductCreate
    if err := proto.Unmarshal(m.Value, &msg); err != nil {
        s.log.WarnMsg("proto.Unmarshal", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    proUUID, err := uuid.FromString(msg.GetProductID())
    if err != nil {
        s.log.WarnMsg("proto.Unmarshal", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    command := commands.NewCreateProductCommand(proUUID, msg.GetName(), msg.GetDescription(), msg.GetPrice())
    if err := s.v.StructCtx(ctx, command); err != nil {
        s.log.WarnMsg("validate", err)
        s.commitErrMessage(ctx, r, m)
        return
    }

    if err := retry.Do(func() error {
        return s.ps.Commands.CreateProduct.Handle(ctx, command)
    }, append(retryOptions, retry.Context(ctx))...); err != nil {
        s.log.WarnMsg("CreateProduct.Handle", err)
        s.metrics.ErrorKafkaMessages.Inc()
        return
    }

    s.commitMessage(ctx, r, m)
}


Enter fullscreen mode Exit fullscreen mode

Writer 的服务创建产品命令将数据保存到 postgres,并将产品保存事件发布到 kafka:



func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    productDto := &models.Product{ProductID: command.ProductID, Name: command.Name, Description: command.Description, Price: command.Price}

    product, err := c.pgRepo.CreateProduct(ctx, productDto)
    if err != nil {
        return err
    }

    msg := &kafkaMessages.ProductCreated{Product: mappers.ProductToGrpcMessage(product)}
    msgBytes, err := proto.Marshal(msg)
    if err != nil {
        return err
    }

    message := kafka.Message{
        Topic:   c.cfg.KafkaTopics.ProductCreated.TopicName,
        Value:   msgBytes,
        Time:    time.Now().UTC(),
        Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
    }

    return c.kafkaProducer.PublishMessage(ctx, message)
}


Enter fullscreen mode Exit fullscreen mode

Postgres 存储库使用 pgx,代码很简单:



func (p *productRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productRepository.CreateProduct")
    defer span.Finish()

    var created models.Product
    if err := p.db.QueryRow(ctx, createProductQuery, &product.ProductID, &product.Name, &product.Description, &product.Price).Scan(
        &created.ProductID,
        &created.Name,
        &created.Description,
        &created.Price,
        &created.CreatedAt,
        &created.UpdatedAt,
    ); err != nil {
        return nil, errors.Wrap(err, "db.QueryRow")
    }

    return &created, nil
}


Enter fullscreen mode Exit fullscreen mode

读者服务消费 Kafka 消息,保存到 MongoDB 并通过 Redis 缓存,然后投射数据以供 gRPC 调用检索。
此处的 Kafka 监听器处理程序看起来相同,产品创建命令将数据保存到 MongoDB 并缓存在 Redis 中:



func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
    defer span.Finish()

    product := &models.Product{
        ProductID:   command.ProductID,
        Name:        command.Name,
        Description: command.Description,
        Price:       command.Price,
        CreatedAt:   command.CreatedAt,
        UpdatedAt:   command.UpdatedAt,
    }

    created, err := c.mongoRepo.CreateProduct(ctx, product)
    if err != nil {
        return err
    }

    c.redisRepo.PutProduct(ctx, created.ProductID, created)
    return nil
}


Enter fullscreen mode Exit fullscreen mode

MongoDB 存储库保存方法:



func (p *mongoRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.CreateProduct")
    defer span.Finish()

    collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)

    _, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "InsertOne")
    }

    return product, nil
}


Enter fullscreen mode Exit fullscreen mode

而redis的缓存方式是:



func (r *redisRepository) PutProduct(ctx context.Context, key string, product *models.Product) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "redisRepository.PutProduct")
    defer span.Finish()

    productBytes, err := json.Marshal(product)
    if err != nil {
        r.log.WarnMsg("json.Marshal", err)
        return
    }

    if err := r.redisClient.HSetNX(ctx, r.getRedisProductPrefixKey(), key, productBytes).Err(); err != nil {
        r.log.WarnMsg("redisClient.HSetNX", err)
        return
    }
    r.log.Debugf("HSetNX prefix: %s, key: %s", r.getRedisProductPrefixKey(), key)
}


Enter fullscreen mode Exit fullscreen mode

然后,api网关可以使用gRPC向读取器服务请求数据。
读取器gRPC服务方法:



func (s *grpcService) GetProductById(ctx context.Context, req *readerService.GetProductByIdReq) (*readerService.GetProductByIdRes, error) {
    s.metrics.GetProductByIdGrpcRequests.Inc()

    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetProductById")
    defer span.Finish()

    productUUID, err := uuid.FromString(req.GetProductID())
    if err != nil {
        s.log.WarnMsg("uuid.FromString", err)
        return nil, s.errResponse(codes.InvalidArgument, err)
    }

    query := queries.NewGetProductByIdQuery(productUUID)
    if err := s.v.StructCtx(ctx, query); err != nil {
        s.log.WarnMsg("validate", err)
        return nil, s.errResponse(codes.InvalidArgument, err)
    }

    product, err := s.ps.Queries.GetProductById.Handle(ctx, query)
    if err != nil {
        s.log.WarnMsg("GetProductById.Handle", err)
        return nil, s.errResponse(codes.Internal, err)
    }

    s.metrics.SuccessGrpcRequests.Inc()
    return &readerService.GetProductByIdRes{Product: models.ProductToGrpcMessage(product)}, nil
}


Enter fullscreen mode Exit fullscreen mode

通过id获取方法追踪:

Api 网关通过 id http 处理程序方法获取产品:



// GetProductByID
// @Tags Products
// @Summary Get product
// @Description Get product by id
// @Accept json
// @Produce json
// @Param id path string true "Product ID"
// @Success 200 {object} dto.ProductResponse
// @Router /products/{id} [get]
func (h *productsHandlers) GetProductByID() echo.HandlerFunc {
    return func(c echo.Context) error {
        h.metrics.GetProductByIdHttpRequests.Inc()

        ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.GetProductByID")
        defer span.Finish()

        productUUID, err := uuid.FromString(c.Param(constants.ID))
        if err != nil {
            h.log.WarnMsg("uuid.FromString", err)
            h.traceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query := queries.NewGetProductByIdQuery(productUUID)
        response, err := h.ps.Queries.GetProductById.Handle(ctx, query)
        if err != nil {
            h.log.WarnMsg("GetProductById", err)
            h.metrics.ErrorHttpRequests.Inc()
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.metrics.SuccessHttpRequests.Inc()
        return c.JSON(http.StatusOK, response)
    }
}


Enter fullscreen mode Exit fullscreen mode

和查询处理程序代码:



func (q *getProductByIdHandler) Handle(ctx context.Context, query *GetProductByIdQuery) (*dto.ProductResponse, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getProductByIdHandler.Handle")
    defer span.Finish()

    ctx = tracing.InjectTextMapCarrierToGrpcMetaData(ctx, span.Context())
    res, err := q.rsClient.GetProductById(ctx, &readerService.GetProductByIdReq{ProductID: query.ProductID.String()})
    if err != nil {
        return nil, err
    }

    return dto.ProductResponseFromGrpc(res.GetProduct()), nil
}


Enter fullscreen mode Exit fullscreen mode

搜索商品方式:



func (p *mongoRepository) Search(ctx context.Context, search string, pagination *utils.Pagination) (*models.ProductsList, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Search")
    defer span.Finish()

    collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)

    filter := bson.D{
        {Key: "$or", Value: bson.A{
            bson.D{{Key: "name", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
            bson.D{{Key: "description", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
        }},
    }

    count, err := collection.CountDocuments(ctx, filter)
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "CountDocuments")
    }
    if count == 0 {
        return &models.ProductsList{Products: make([]*models.Product, 0)}, nil
    }

    limit := int64(pagination.GetLimit())
    skip := int64(pagination.GetOffset())
    cursor, err := collection.Find(ctx, filter, &options.FindOptions{
        Limit: &limit,
        Skip:  &skip,
    })
    if err != nil {
        p.traceErr(span, err)
        return nil, errors.Wrap(err, "Find")
    }
    defer cursor.Close(ctx)

    products := make([]*models.Product, 0, pagination.GetSize())

    for cursor.Next(ctx) {
        var prod models.Product
        if err := cursor.Decode(&prod); err != nil {
            p.traceErr(span, err)
            return nil, errors.Wrap(err, "Find")
        }
        products = append(products, &prod)
    }

    if err := cursor.Err(); err != nil {
        span.SetTag("error", true)
        span.LogKV("error_code", err.Error())
        return nil, errors.Wrap(err, "cursor.Err")
    }

    return models.NewProductListWithPagination(products, count, pagination), nil
}


Enter fullscreen mode Exit fullscreen mode

更多详细信息和源代码可在此处找到。当然,
在实际应用中,我们还需要实现更多必要的功能,
例如断路器、重试、速率限制器等。具体实现方式取决于项目,
例如,您可以使用 Kubernetes 和 Istio 来实现其中一些功能。
希望本文对您有所帮助,我很乐意收到任何反馈或问题,欢迎随时通过电子邮件或任何 Messenger 联系我 :)

文章来源:https://dev.to/aleksk1ng/go-kafka-and-grpc-clean-architecture-cqrs-microservices-with-jaeger-tracing-45bj
PREV
如何使用 Mapbox 和 React 创建 COVID-19 地图
NEXT
Go EventSourcing and CQRS microservice using EventStoreDB 👋⚡️💫