具有指标和跟踪功能的 Go、Kafka、gRPC 和 MongoDB 微服务👋

2025-06-08

具有指标和跟踪功能的 Go、Kafka、gRPC 和 MongoDB 微服务👋

本文尝试使用以下方式实现清洁架构微服务:🚀
Kafka作为消息代理
gRPC Go 实现 gRPC
MongoDB作为数据库
Jaeger开源,端到端分布式跟踪
Prometheus监控和警报
Grafana用于使用 Prometheus 的所有内容组成可观察性仪表板

您可以在GitHub 存储库 中找到源代码,

我想重点介绍Kafka,因为这是我第一次尝试它,所以它是边做边学👨‍💻

当然,任何微服务都需要监控和跟踪,所以它包含在内⚡️

Kafka 是一个高吞吐量的分布式消息系统。
在任何时候,只有一个 Broker 可以成为给定分区的 Leader,并且只有该 Leader 可以接收该分区的服务数据,
其他 Broker 负责同步数据。

制作人

生产者将数据写入主题,并自动知道要写入哪个代理和分区。

如果代理发生故障,生产者将自动恢复。

生产者可以选择接收数据写入的确认:

  • acks = 0,生产者不会确认,这意味着可能丢失数据。
  • acks = 1。生产者将等待领导者确认,在某些情况下也可能出现有限的数据丢失。
  • acks = all。leader + replicas 确认,需要从 broker 端配置键 min.insync.replicas 中指定数量的 broker 获得确认,如果当前同步的 broker 数量少于此值,produce 将会失败。您可以将 min.insync.replicas 指定为最低 1(相当于 acks==1),也可以指定为最高副本数,或者介于两者之间,这样您就可以在可用性和一致性之间进行精细的权衡。这里有一篇好文章

生产者可以选择随消息发送密钥。

  • 如果没有密钥数据则以循环方式发送。
  • 如果发送了一个密钥,那么该密钥的所有消息将始终发送到同一个分区。

消费者

消费者从主题读取数据,并知道从哪个代理读取。
每个分区内的数据按顺序读取。

消费者群体

消费者以消费者组的形式读取数据。组内的每个消费者都从专属分区读取数据。
因此,如果消费者数量超过分区数量,某些消费者可能会处于非活跃状态。

消费者抵消

Kafka 存储消费者组读取数据的偏移量。
当组中的某个消费者处理完数据后,它应该提交偏移量。
如果消费者宕机,它能够从中断处继续读取数据。
消费者可以选择何时提交偏移量。

共有 3 个发货箱

最多一次

  • 收到消息后立即提交偏移量
  • 如果某个过程失败,该消息将会丢失并且不会被再次读取。

至少一次

  • 消息处理后,偏移量将被提交
  • 如果某个过程失败,则会再次读取该消息。
  • 这可能会导致消息的重复处理,因此确保您的处理是幂等的并且不会影响您的系统非常重要。

恰好一次

  • 可以使用 kafka streams api 实现 kakfa 到 kafka 的通信。
  • 对于外部系统的使用,我们需要使用幂等消费者。

压缩
还有一个非常重要的功能是压缩。
压缩在生产者级别启用,不需要在 Broker 或消费者级别进行任何配置更改。
默认情况下,压缩为无。对于入门级用户,snappylz4是不错的选择。

对于本地开发:



make local // runs docker-compose.local.yml
make crate_topics // create kafka topics
make mongo // load js init script to mongo docker container
make make_cert // generate local SLL certificates
make swagger // generate swagger documentation


Enter fullscreen mode Exit fullscreen mode

UI 界面将在以下端口上可用:

Jaeger 用户界面:http://localhost:16686

Prometheus 用户界面:http://localhost:9090

Grafana 用户界面:http://localhost:3000

Kafka 用户界面:http://localhost:9000

Swagger UI 默认运行于:https://localhost:5007/swagger/index.html




在 Grafana 中,您需要选择 prometheus 作为指标源,然后创建仪表板。

带有云 UI 的良好 kafka docker 设置是融合的,但它具有巨大的图像大小,并且会将全球一半的互联网下载到您的本地电脑上。🤖 因此,在这里我使用kafdrop作为 UI 客户端

Docker-compose.local.yml



version: "3.8"

services:
  zookeeper:
    container_name: zookeeper
    restart: always
    image: zookeeper:3.4.9
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalog
    networks:
      - products_network


  kafka1:
    container_name: kafka1
    image: confluentinc/cp-kafka:5.3.0
    restart: always
    hostname: kafka1
    ports:
      - "9091:9091"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
    networks:
      - products_network

  kafka2:
    container_name: kafka2
    restart: always
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka2
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: 2
    volumes:
      - ./data/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
    networks:
      - products_network

  kafka3:
    container_name: kafka3
    image: confluentinc/cp-kafka:5.3.0
    restart: always
    hostname: kafka3
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
    networks:
      - products_network

  kafdrop:
    container_name: kafdrop
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka1:19091"
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    networks:
      - products_network

  redis:
    image: redis:6-alpine
    container_name: user_redis
    ports:
      - "6379:6379"
    restart: always
    networks:
      - products_network

  prometheus:
    container_name: prometheus_container
    restart: always
    image: prom/prometheus
    volumes:
      - ./monitoring/prometheus-local.yml:/etc/prometheus/prometheus.yml:Z
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--storage.tsdb.retention=20d'
      - '--web.console.libraries=/usr/share/prometheus/console_libraries'
      - '--web.console.templates=/usr/share/prometheus/consoles'
    ports:
      - '9090:9090'
    networks:
      - products_network


  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks:
      - products_network

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3000:3000'
    networks:
      - products_network

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - 5775:5775/udp
      - 6831:6831/udp
      - 6832:6832/udp
      - 5778:5778
      - 16686:16686
      - 14268:14268
      - 14250:14250
      - 9411:9411
    networks:
      - products_network

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - 27017:27017
    volumes:
      - mongodb_data_container:/data/db

volumes:
  mongodb_data_container:

networks:
  products_network:
    driver: bridge


Enter fullscreen mode Exit fullscreen mode

生产环境中的 Go 客户端通常使用segmentsiosarama
两者都很好用,具体选择哪个取决于你。在本项目中,我使用了 segmentsio。
由于时间有限,我没有实现任何重要的业务逻辑,也没有进行测试。
我们的微服务可以通过 kafka、gRPC 和 REST 进行通信。

在 Makefile 中,你可以找到所有有用的命令。
例如,在 docker 中创建 kafka 主题:



docker exec -it kafka1 kafka-topics --zookeeper zookeeper:2181 --create --topic create-product --partitions 3 --replication-factor 2


Enter fullscreen mode Exit fullscreen mode

对于 MongoDB,我们可以加载 javascript 文件,这个文件创建集合和索引:



mongo admin -u admin -p admin < init.js


Enter fullscreen mode Exit fullscreen mode

Segmentio 库 API 提供了 reader 和 writer。
首先创建 reader:



func (pcg *ProductsConsumerGroup) getNewKafkaReader(kafkaURL []string, topic, groupID string) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:                kafkaURL,
        GroupID:                groupID,
        Topic:                  topic,
        MinBytes:               minBytes,
        MaxBytes:               maxBytes,
        QueueCapacity:          queueCapacity,
        HeartbeatInterval:      heartbeatInterval,
        CommitInterval:         commitInterval,
        PartitionWatchInterval: partitionWatchInterval,
        Logger:                 kafka.LoggerFunc(pcg.log.Debugf),
        ErrorLogger:            kafka.LoggerFunc(pcg.log.Errorf),
        MaxAttempts:            maxAttempts,
        Dialer: &kafka.Dialer{
            Timeout: dialTimeout,
        },
    })
}


Enter fullscreen mode Exit fullscreen mode

和作家:



func (pcg *ProductsConsumerGroup) getNewKafkaWriter(topic string) *kafka.Writer {
    w := &kafka.Writer{
        Addr:         kafka.TCP(pcg.Brokers...),
        Topic:        topic,
        Balancer:     &kafka.LeastBytes{},
        RequiredAcks: writerRequiredAcks,
        MaxAttempts:  writerMaxAttempts,
        Logger:       kafka.LoggerFunc(pcg.log.Debugf),
        ErrorLogger:  kafka.LoggerFunc(pcg.log.Errorf),
        Compression:  compress.Snappy,
        ReadTimeout:  writerReadTimeout,
        WriteTimeout: writerWriteTimeout,
    }
    return w
}


Enter fullscreen mode Exit fullscreen mode

然后使用Worker Pools创建消费者



func (pcg *ProductsConsumerGroup) consumeCreateProduct(
    ctx context.Context,
    cancel context.CancelFunc,
    groupID string,
    topic string,
    workersNum int,
) {
    r := pcg.getNewKafkaReader(pcg.Brokers, topic, groupID)
    defer cancel()
    defer func() {
        if err := r.Close(); err != nil {
            pcg.log.Errorf("r.Close", err)
            cancel()
        }
    }()

    w := pcg.getNewKafkaWriter(deadLetterQueueTopic)
    defer func() {
        if err := w.Close(); err != nil {
            pcg.log.Errorf("w.Close", err)
            cancel()
        }
    }()

    pcg.log.Infof("Starting consumer group: %v", r.Config().GroupID)

    wg := &sync.WaitGroup{}
    for i := 0; i <= workersNum; i++ {
        wg.Add(1)
        go pcg.createProductWorker(ctx, cancel, r, w, wg, i)
    }
    wg.Wait()
}


Enter fullscreen mode Exit fullscreen mode

工作人员验证消息主体,然后调用 usecase,如果返回错误,则尝试重试,retry-go是一个不错的重试库。
如果再次失败,则将错误消息发布到非常简单的死信队列(Dead Letter Queue),正如我之前所说,这里没有实现任何有趣的业务逻辑,所以在实际生产中,我们必须以更好的方式处理错误情况。
消息处理成功后,再提交。



func (pcg *ProductsConsumerGroup) createProductWorker(
    ctx context.Context,
    cancel context.CancelFunc,
    r *kafka.Reader,
    w *kafka.Writer,
    wg *sync.WaitGroup,
    workerID int,
) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "ProductsConsumerGroup.createProductWorker")
    defer span.Finish()

    span.LogFields(log.String("ConsumerGroup", r.Config().GroupID))

    defer wg.Done()
    defer cancel()

    for {
        m, err := r.FetchMessage(ctx)
        if err != nil {
            pcg.log.Errorf("FetchMessage", err)
            return
        }

        pcg.log.Infof(
            "WORKER: %v, message at topic/partition/offset %v/%v/%v: %s = %s\n",
            workerID,
            m.Topic,
            m.Partition,
            m.Offset,
            string(m.Key),
            string(m.Value),
        )
        incomingMessages.Inc()

        var prod models.Product
        if err := json.Unmarshal(m.Value, &prod); err != nil {
            errorMessages.Inc()
            pcg.log.Errorf("json.Unmarshal", err)
            continue
        }

        if err := pcg.validate.StructCtx(ctx, prod); err != nil {
            errorMessages.Inc()
            pcg.log.Errorf("validate.StructCtx", err)
            continue
        }

        if err := retry.Do(func() error {
            created, err := pcg.productsUC.Create(ctx, &prod)
            if err != nil {
                return err
            }
            pcg.log.Infof("created product: %v", created)
            return nil
        },
            retry.Attempts(retryAttempts),
            retry.Delay(retryDelay),
            retry.Context(ctx),
        ); err != nil {
            errorMessages.Inc()

            if err := pcg.publishErrorMessage(ctx, w, m, err); err != nil {
                pcg.log.Errorf("publishErrorMessage", err)
                continue
            }
            pcg.log.Errorf("productsUC.Create.publishErrorMessage", err)
            continue
        }

        if err := r.CommitMessages(ctx, m); err != nil {
            errorMessages.Inc()
            pcg.log.Errorf("CommitMessages", err)
            continue
        }

        successMessages.Inc()
    }
}


Enter fullscreen mode Exit fullscreen mode

在存储库层使用mongo-go-driver与数据库交互



// Create Create new product
func (p *productMongoRepo) Create(ctx context.Context, product *models.Product) (*models.Product, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productMongoRepo.Create")
    defer span.Finish()

    collection := p.mongoDB.Database(productsDB).Collection(productsCollection)

    product.CreatedAt = time.Now().UTC()
    product.UpdatedAt = time.Now().UTC()

    result, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
    if err != nil {
        return nil, errors.Wrap(err, "InsertOne")
    }

    objectID, ok := result.InsertedID.(primitive.ObjectID)
    if !ok {
        return nil, errors.Wrap(productErrors.ErrObjectIDTypeConversion, "result.InsertedID")
    }

    product.ProductID = objectID

    return product, nil
}


Enter fullscreen mode Exit fullscreen mode

这是 gRPC 服务实现 os create handler,您可以在 github 存储库中找到完整代码:



// Create create new product
func (p *productService) Create(ctx context.Context, req *productsService.CreateReq) (*productsService.CreateRes, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
    defer span.Finish()
    createMessages.Inc()

    catID, err := primitive.ObjectIDFromHex(req.GetCategoryID())
    if err != nil {
        errorMessages.Inc()
        p.log.Errorf("primitive.ObjectIDFromHex: %v", err)
        return nil, grpcErrors.ErrorResponse(err, err.Error())
    }

    prod := &models.Product{
        CategoryID:  catID,
        Name:        req.GetName(),
        Description: req.GetDescription(),
        Price:       req.GetPrice(),
        ImageURL:    &req.ImageURL,
        Photos:      req.GetPhotos(),
        Quantity:    req.GetQuantity(),
        Rating:      int(req.GetRating()),
    }

    created, err := p.productUC.Create(ctx, prod)
    if err != nil {
        errorMessages.Inc()
        p.log.Errorf("productUC.Create: %v", err)
        return nil, grpcErrors.ErrorResponse(err, err.Error())
    }

    successMessages.Inc()
    return &productsService.CreateRes{Product: created.ToProto()}, nil
}


Enter fullscreen mode Exit fullscreen mode

以及使用echo的 REST API 处理程序



// CreateProduct Create product
// @Tags Products
// @Summary Create new product
// @Description Create new single product
// @Accept json
// @Produce json
// @Success 201 {object} models.Product
// @Router /products [post]
func (p *productHandlers) CreateProduct() echo.HandlerFunc {
    return func(c echo.Context) error {
        span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "productHandlers.Create")
        defer span.Finish()
        createRequests.Inc()

        var prod models.Product
        if err := c.Bind(&prod); err != nil {
            p.log.Errorf("c.Bind: %v", err)
            return httpErrors.ErrorCtxResponse(c, err)
        }

        if err := p.validate.StructCtx(ctx, &prod); err != nil {
            p.log.Errorf("validate.StructCtx: %v", err)
            return httpErrors.ErrorCtxResponse(c, err)
        }

        if err := p.productUC.PublishCreate(ctx, &prod); err != nil {
            p.log.Errorf("productUC.PublishCreate: %v", err)
            return httpErrors.ErrorCtxResponse(c, err)
        }

        successRequests.Inc()
        return c.NoContent(http.StatusCreated)
    }
}


Enter fullscreen mode Exit fullscreen mode

在我们应用程序处理的顶层,记录 Prometheus 的错误和进程指标。您可以在这里
找到包含源代码和所有使用工具列表的存储库👨‍💻 :) 希望本文对您有所帮助,我很乐意收到任何反馈或问题 :)

鏂囩珷鏉ユ簮锛�https://dev.to/aleksk1ng/go-kafka-grpc-and-mongodb-microservice-with-metrics-and-tracing-448d
PREV
如何在 React 中构建像素艺术绘图应用程序
NEXT
学习编程,无需浪费时间和金钱