Go、Kafka 和 gRPC 清洁架构 CQRS 微服务与 Jaeger 跟踪👋🧑💻
在本文中,让我们尝试使用以下方法创建更接近现实世界的具有跟踪和监控功能的 CQRS 微服务:🚀
Kafka作为消息代理
gRPC gRPC 的 Go 实现
PostgreSQL作为数据库
Jaeger开源、端到端分布式跟踪
Prometheus监控和警报
Grafana用于使用 Prometheus
MongoDB Web 和基于 API 的 SMTP 测试
Redis用于 Golang
swag 的类型安全 Redis 客户端Swagger 用于 Go
Echo Web 框架
源代码可以在GitHub 仓库
中找到 。本文主要思想是使用 Go、Kafka 和 gRPC 实现 CQRS。
我不会尝试写什么是 CQRS 模式,因为这会让文章篇幅过长,最好的阅读地点是microservices.io。我发现这篇文章非常有趣,并以Three Dots Labs
的 CQRS 项目和博客作为起点。
在这个例子中,我们有三个服务:API 网关、读写服务,它们通过 Kafka 和 gRPC 进行通信。
写入数据库使用 Postgres,读取数据库使用 MongoDB,缓存使用 Redis。
与任何实际项目一样,我们当然需要指标和跟踪,这里我们使用 Prometheus 和 Grafana 来获取指标,并使用 Jaeger 来跟踪。
由于时间有限,这个例子中没有实现任何有趣的业务逻辑,也没有涉及测试。
UI 界面将在以下端口上可用:
Jaeger 用户界面:http://localhost:16686
Prometheus 用户界面:http://localhost:9090
Grafana 用户界面:http://localhost:3000
Swagger 用户界面:http://localhost:5001/swagger/index.html
对于本地开发:
make local or docker_dev // for run docker compose files
make migrate_up // run sql migrations
make mongo // run mongodb sripts
make swagger // generate swagger documentation
要在 docker 中运行所有内容,您可以运行make docker_dev,它具有热重加载功能。
该项目的 Docker Compose 文件:
version: "3.8"
services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3005:3000'
networks: [ "microservices" ]
microservices_postgesql:
image: postgres:13-alpine
container_name: microservices_postgesql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=products
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./microservices_pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]
redis:
image: redis:6-alpine
restart: always
container_name: microservices_redis
ports:
- "6379:6379"
networks: [ "microservices" ]
zoo1:
image: zookeeper:3.4.9
restart: always
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
networks: [ "microservices" ]
kafka1:
image: confluentinc/cp-kafka:5.5.1
restart: always
hostname: kafka1
ports:
- "9092:9092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
networks: [ "microservices" ]
mongodb:
image: mongo:latest
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: products
ports:
- "27017:27017"
volumes:
- mongodb_data_container:/data/db
networks: [ "microservices" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "14250:14250"
- "9411:9411"
networks: [ "microservices" ]
volumes:
mongodb_data_container:
networks:
microservices:
name: microservices
API 网关服务的理念是接受 http 请求,命令处理程序将事件发布到 Kafka,并通过 gRPC 查询处理程序从读取器服务检索数据。
在本项目中使用了Echo,但发现 Gin 和 Chi 也非常适合生产环境。
让我们看一下代码:创建产品 http 处理程序,接受并验证请求体,然后生成 UUID 并调用创建产品命令。
关于将 ID 传递给命令或服务方法(作为单独的参数或在请求体中传递),或在命令中生成 ID 并返回,以及
命令是否返回值等等,存在许多争论,这些争论毫无意义。选择哪种方式取决于你和你的团队,最好把时间花在更重要的业务任务上 :)
// CreateProduct
// @Tags Products
// @Summary Create product
// @Description Create new product item
// @Accept json
// @Produce json
// @Success 201 {object} dto.CreateProductResponseDto
// @Router /products [post]
func (h *productsHandlers) CreateProduct() echo.HandlerFunc {
return func(c echo.Context) error {
h.metrics.CreateProductHttpRequests.Inc()
ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.CreateProduct")
defer span.Finish()
createDto := &dto.CreateProductDto{}
if err := c.Bind(createDto); err != nil {
h.log.WarnMsg("Bind", err)
h.traceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
createDto.ProductID = uuid.NewV4()
if err := h.v.StructCtx(ctx, createDto); err != nil {
h.log.WarnMsg("validate", err)
h.traceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
if err := h.ps.Commands.CreateProduct.Handle(ctx, commands.NewCreateProductCommand(createDto)); err != nil {
h.log.WarnMsg("CreateProduct", err)
h.metrics.ErrorHttpRequests.Inc()
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.metrics.SuccessHttpRequests.Inc()
return c.JSON(http.StatusCreated, dto.CreateProductResponseDto{ProductID: createDto.ProductID})
}
}
创建产品命令处理程序很简单,只需整理命令数据并发布到 Kafka 即可。Kafka
接受 []byte 作为值,这里使用了 proto。为了在整个 Kafka 中传递跟踪信息,我们必须使用标头,
在跟踪实用程序中可以找到相关的帮助程序。Go
有一些不错的 Kafka 库,我喜欢segmentsio_kafka-go。
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
defer span.Finish()
createDto := &kafkaMessages.ProductCreate{
ProductID: command.CreateDto.ProductID.String(),
Name: command.CreateDto.Name,
Description: command.CreateDto.Description,
Price: command.CreateDto.Price,
}
dtoBytes, err := proto.Marshal(createDto)
if err != nil {
return err
}
return c.kafkaProducer.PublishMessage(ctx, kafka.Message{
Topic: c.cfg.KafkaTopics.ProductCreate.TopicName,
Value: dtoBytes,
Time: time.Now().UTC(),
Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
})
}
对于使用 Kafka 来说,最好有 UI 客户端进行调试,个人喜欢使用Conductor
写入服务使用 Kafka 主题,处理写入 Postgres 的消息,并将成功处理的消息发布到 Kafka。
在我看来,在 Go 中使用 Postgres 的最佳选择是pgx,但如果你需要查询构建器,Squirrel
是一个非常好的库 。我个人不喜欢 ORM,但通常情况下,正如我所见,团队经常使用Gorm,这取决于你。ProcessMessages
方法监听 Kafka 主题并根据主题调用特定方法:
func (s *productMessageProcessor) ProcessMessages(ctx context.Context, r *kafka.Reader, wg *sync.WaitGroup, workerID int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
m, err := r.FetchMessage(ctx)
if err != nil {
s.log.Warnf("workerID: %v, err: %v", workerID, err)
continue
}
s.logProcessMessage(m, workerID)
switch m.Topic {
case s.cfg.KafkaTopics.ProductCreate.TopicName:
s.processCreateProduct(ctx, r, m)
case s.cfg.KafkaTopics.ProductUpdate.TopicName:
s.processUpdateProduct(ctx, r, m)
case s.cfg.KafkaTopics.ProductDelete.TopicName:
s.processDeleteProduct(ctx, r, m)
}
}
}
Kafka 消息处理方法反序列化和验证消息体,将其传递给命令并提交,
在这里我们必须选择处理错误的方式,但这取决于业务逻辑,例如
我们可以使用死信队列模式。
func (s *productMessageProcessor) processCreateProduct(ctx context.Context, r *kafka.Reader, m kafka.Message) {
s.metrics.CreateProductKafkaMessages.Inc()
ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "productMessageProcessor.processCreateProduct")
defer span.Finish()
var msg kafkaMessages.ProductCreate
if err := proto.Unmarshal(m.Value, &msg); err != nil {
s.log.WarnMsg("proto.Unmarshal", err)
s.commitErrMessage(ctx, r, m)
return
}
proUUID, err := uuid.FromString(msg.GetProductID())
if err != nil {
s.log.WarnMsg("proto.Unmarshal", err)
s.commitErrMessage(ctx, r, m)
return
}
command := commands.NewCreateProductCommand(proUUID, msg.GetName(), msg.GetDescription(), msg.GetPrice())
if err := s.v.StructCtx(ctx, command); err != nil {
s.log.WarnMsg("validate", err)
s.commitErrMessage(ctx, r, m)
return
}
if err := retry.Do(func() error {
return s.ps.Commands.CreateProduct.Handle(ctx, command)
}, append(retryOptions, retry.Context(ctx))...); err != nil {
s.log.WarnMsg("CreateProduct.Handle", err)
s.metrics.ErrorKafkaMessages.Inc()
return
}
s.commitMessage(ctx, r, m)
}
Writer 的服务创建产品命令将数据保存到 postgres,并将产品保存事件发布到 kafka:
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
defer span.Finish()
productDto := &models.Product{ProductID: command.ProductID, Name: command.Name, Description: command.Description, Price: command.Price}
product, err := c.pgRepo.CreateProduct(ctx, productDto)
if err != nil {
return err
}
msg := &kafkaMessages.ProductCreated{Product: mappers.ProductToGrpcMessage(product)}
msgBytes, err := proto.Marshal(msg)
if err != nil {
return err
}
message := kafka.Message{
Topic: c.cfg.KafkaTopics.ProductCreated.TopicName,
Value: msgBytes,
Time: time.Now().UTC(),
Headers: tracing.GetKafkaTracingHeadersFromSpanCtx(span.Context()),
}
return c.kafkaProducer.PublishMessage(ctx, message)
}
Postgres 存储库使用 pgx,代码很简单:
func (p *productRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productRepository.CreateProduct")
defer span.Finish()
var created models.Product
if err := p.db.QueryRow(ctx, createProductQuery, &product.ProductID, &product.Name, &product.Description, &product.Price).Scan(
&created.ProductID,
&created.Name,
&created.Description,
&created.Price,
&created.CreatedAt,
&created.UpdatedAt,
); err != nil {
return nil, errors.Wrap(err, "db.QueryRow")
}
return &created, nil
}
读者服务消费 Kafka 消息,保存到 MongoDB 并通过 Redis 缓存,然后投射数据以供 gRPC 调用检索。
此处的 Kafka 监听器处理程序看起来相同,产品创建命令将数据保存到 MongoDB 并缓存在 Redis 中:
func (c *createProductHandler) Handle(ctx context.Context, command *CreateProductCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createProductHandler.Handle")
defer span.Finish()
product := &models.Product{
ProductID: command.ProductID,
Name: command.Name,
Description: command.Description,
Price: command.Price,
CreatedAt: command.CreatedAt,
UpdatedAt: command.UpdatedAt,
}
created, err := c.mongoRepo.CreateProduct(ctx, product)
if err != nil {
return err
}
c.redisRepo.PutProduct(ctx, created.ProductID, created)
return nil
}
MongoDB 存储库保存方法:
func (p *mongoRepository) CreateProduct(ctx context.Context, product *models.Product) (*models.Product, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.CreateProduct")
defer span.Finish()
collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)
_, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
if err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "InsertOne")
}
return product, nil
}
而redis的缓存方式是:
func (r *redisRepository) PutProduct(ctx context.Context, key string, product *models.Product) {
span, ctx := opentracing.StartSpanFromContext(ctx, "redisRepository.PutProduct")
defer span.Finish()
productBytes, err := json.Marshal(product)
if err != nil {
r.log.WarnMsg("json.Marshal", err)
return
}
if err := r.redisClient.HSetNX(ctx, r.getRedisProductPrefixKey(), key, productBytes).Err(); err != nil {
r.log.WarnMsg("redisClient.HSetNX", err)
return
}
r.log.Debugf("HSetNX prefix: %s, key: %s", r.getRedisProductPrefixKey(), key)
}
然后,api网关可以使用gRPC向读取器服务请求数据。
读取器gRPC服务方法:
func (s *grpcService) GetProductById(ctx context.Context, req *readerService.GetProductByIdReq) (*readerService.GetProductByIdRes, error) {
s.metrics.GetProductByIdGrpcRequests.Inc()
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetProductById")
defer span.Finish()
productUUID, err := uuid.FromString(req.GetProductID())
if err != nil {
s.log.WarnMsg("uuid.FromString", err)
return nil, s.errResponse(codes.InvalidArgument, err)
}
query := queries.NewGetProductByIdQuery(productUUID)
if err := s.v.StructCtx(ctx, query); err != nil {
s.log.WarnMsg("validate", err)
return nil, s.errResponse(codes.InvalidArgument, err)
}
product, err := s.ps.Queries.GetProductById.Handle(ctx, query)
if err != nil {
s.log.WarnMsg("GetProductById.Handle", err)
return nil, s.errResponse(codes.Internal, err)
}
s.metrics.SuccessGrpcRequests.Inc()
return &readerService.GetProductByIdRes{Product: models.ProductToGrpcMessage(product)}, nil
}
通过id获取方法追踪:
Api 网关通过 id http 处理程序方法获取产品:
// GetProductByID
// @Tags Products
// @Summary Get product
// @Description Get product by id
// @Accept json
// @Produce json
// @Param id path string true "Product ID"
// @Success 200 {object} dto.ProductResponse
// @Router /products/{id} [get]
func (h *productsHandlers) GetProductByID() echo.HandlerFunc {
return func(c echo.Context) error {
h.metrics.GetProductByIdHttpRequests.Inc()
ctx, span := tracing.StartHttpServerTracerSpan(c, "productsHandlers.GetProductByID")
defer span.Finish()
productUUID, err := uuid.FromString(c.Param(constants.ID))
if err != nil {
h.log.WarnMsg("uuid.FromString", err)
h.traceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
query := queries.NewGetProductByIdQuery(productUUID)
response, err := h.ps.Queries.GetProductById.Handle(ctx, query)
if err != nil {
h.log.WarnMsg("GetProductById", err)
h.metrics.ErrorHttpRequests.Inc()
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.metrics.SuccessHttpRequests.Inc()
return c.JSON(http.StatusOK, response)
}
}
和查询处理程序代码:
func (q *getProductByIdHandler) Handle(ctx context.Context, query *GetProductByIdQuery) (*dto.ProductResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "getProductByIdHandler.Handle")
defer span.Finish()
ctx = tracing.InjectTextMapCarrierToGrpcMetaData(ctx, span.Context())
res, err := q.rsClient.GetProductById(ctx, &readerService.GetProductByIdReq{ProductID: query.ProductID.String()})
if err != nil {
return nil, err
}
return dto.ProductResponseFromGrpc(res.GetProduct()), nil
}
搜索商品方式:
func (p *mongoRepository) Search(ctx context.Context, search string, pagination *utils.Pagination) (*models.ProductsList, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Search")
defer span.Finish()
collection := p.db.Database(p.cfg.Mongo.Db).Collection(p.cfg.MongoCollections.Products)
filter := bson.D{
{Key: "$or", Value: bson.A{
bson.D{{Key: "name", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
bson.D{{Key: "description", Value: primitive.Regex{Pattern: search, Options: "gi"}}},
}},
}
count, err := collection.CountDocuments(ctx, filter)
if err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "CountDocuments")
}
if count == 0 {
return &models.ProductsList{Products: make([]*models.Product, 0)}, nil
}
limit := int64(pagination.GetLimit())
skip := int64(pagination.GetOffset())
cursor, err := collection.Find(ctx, filter, &options.FindOptions{
Limit: &limit,
Skip: &skip,
})
if err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "Find")
}
defer cursor.Close(ctx)
products := make([]*models.Product, 0, pagination.GetSize())
for cursor.Next(ctx) {
var prod models.Product
if err := cursor.Decode(&prod); err != nil {
p.traceErr(span, err)
return nil, errors.Wrap(err, "Find")
}
products = append(products, &prod)
}
if err := cursor.Err(); err != nil {
span.SetTag("error", true)
span.LogKV("error_code", err.Error())
return nil, errors.Wrap(err, "cursor.Err")
}
return models.NewProductListWithPagination(products, count, pagination), nil
}
更多详细信息和源代码可在此处找到。当然,
在实际应用中,我们还需要实现更多必要的功能,
例如断路器、重试、速率限制器等。具体实现方式取决于项目,
例如,您可以使用 Kubernetes 和 Istio 来实现其中一些功能。
希望本文对您有所帮助,我很乐意收到任何反馈或问题,欢迎随时通过电子邮件或任何 Messenger 联系我 :)