Go EventSourcing and CQRS microservice using EventStoreDB 👋⚡️💫

2025-06-04

使用 EventStoreDB 的 Go EventSourcing 和 CQRS 微服务👋⚡️💫

在本文中,让我们尝试使用以下工具创建更接近现实世界的事件源 CQRS 微服务:🚀👨‍💻🙌
EventStoreDB为事件源构建的数据库
gRPC gRPC 的Go 实现
MongoDB基于 Web 和 API 的 SMTP 测试
Elasticsearch Go 的 Elasticsearch 客户端。Jaeger
开源端到端分布式跟踪
Prometheus监控和警报
Grafana用于使用 Prometheus
swag Swagger for Go
Echo Web 框架
Kibana构建可观察性仪表板Kibana 是一个用户界面,可让您可视化您的 Elasticsearch

源代码可以在GitHub 仓库中找到。

该项目的核心思想是使用 Go、EventStoreDB、gRPC、MongoDB 和 ElasticSearch 实现事件溯源和 CQRS。
本文没有详细讨论事件溯源和 CQRS 模式,因为这会使文章篇幅过长。最佳阅读地点是 microservices.io ( https://microservices.io/patterns/data/cqrs.html ),发现这篇文章也很不错,强烈推荐Alexey Zimarev的 《Hands-on Domain-Driven Design with .NET Core》一书和他的博客

在这个项目中,我们使用官方go 客户端与EventStoreDB协同工作,对于 [projections(https://zimarev.com/blog/event-sourcing/projections/)使用MongoDBElasticsearch进行搜索, 并通过 gRPC 和 REST 进行通信。 这里没有实现任何有趣的业务逻辑,也没有涵盖测试,因为没有足够的时间, 事件列表非常简单:创建新订单、更新购物车、支付、提交、取消、更改送货地址、完成订单,当然在现实世界中最好使用更具体、更有意义的事件,但这里的目标是展示这个想法以及它是如何工作的。 事件源可以通过不同的方式实现,这里使用 EventStoreDB,但我们可以使用 PostgreSQL 和 Kafka 来实现。 在尝试了这两种方法之后,发现 EventStoreDB 是一个更好的解决方案,因为所有必需的功能都是开箱即用的,它经过了优化,并且确实有非常优秀的工程师在开发它。




对于本地开发:

make local or docker_dev // for run docker compose
make run_es // run es microservice
Enter fullscreen mode Exit fullscreen mode

或者

make dev // run all in docker compose with hot reload
Enter fullscreen mode Exit fullscreen mode

所有 UI 界面将在以下端口可用:

EventStoreDB 用户界面:http://localhost:2113

事件存储数据库

Jaeger 用户界面:http://localhost:16686

耶格尔

Prometheus 用户界面:http://localhost:9090

普罗米修斯

Grafana 用户界面:http://localhost:3005

格拉法纳

Swagger 用户界面:http://localhost:5007/swagger/index.html

昂首阔步

Kibana 用户界面:http://localhost:5601/app/home#/

基巴纳

该项目的 Docker 撰写文件,
根据您的环境,使用 eventstoredb 的图像:eventstore/eventstore:21.6.0-buster-slim,
以及 M1 的图像:ghcr.io/eventstore/eventstore@sha256:ab30bf2a2629e2c8710f8f7fdcb74df5466c6b3b2200c8e9ad8a797ed138012a

version: "3.8"

services:
  eventstore.db:
    image: eventstore/eventstore:21.6.0-buster-slim
    environment:
      - EVENTSTORE_CLUSTER_SIZE=1
      - EVENTSTORE_RUN_PROJECTIONS=All
      - EVENTSTORE_START_STANDARD_PROJECTIONS=true
      - EVENTSTORE_EXT_TCP_PORT=1113
      - EVENTSTORE_HTTP_PORT=2113
      - EVENTSTORE_INSECURE=true
      - EVENTSTORE_ENABLE_EXTERNAL_TCP=true
      - EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
    ports:
      - "1113:1113"
      - "2113:2113"
    volumes:
      - type: volume
        source: eventstore-volume-data
        target: /var/lib/eventstore
      - type: volume
        source: eventstore-volume-logs
        target: /var/log/eventstore
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data_container:/data/db
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

  node01:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.1
    container_name: node01
    restart: always
    environment:
      - node.name=node01
      - cluster.name=es-cluster-7
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms128m -Xmx128m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - es-data01:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
    networks: [ "microservices" ]

  kibana:
    image: docker.elastic.co/kibana/kibana:7.11.1
    restart: always
    environment:
      ELASTICSEARCH_HOSTS: http://node01:9200
    ports:
      - "5601:5601"
    depends_on:
      - node01
    networks: [ "microservices" ]

volumes:
  mongodb_data_container:
  eventstore-volume-data:
  eventstore-volume-logs:
  es-data01:
    driver: local

networks:
  microservices:
    name: microservices
Enter fullscreen mode Exit fullscreen mode

首先,我们查看 EventStoreDB,然后添加一些使用它的代码,我们可以查看流和事件
EventStoreDB ui

AggregateRoot 可以通过多种方式实现,就本项目以及 Go 语言而言,主要的聚合根方法包括:加载事件和应用更改。
当我们从数据库中获取聚合时,我们不是将其状态作为表或文档中的一条记录读取,而是读取之前保存的所有事件,并为每个事件调用When方法。
完成所有这些步骤后,我们将恢复给定聚合的所有历史记录。通过这样做,我们将聚合恢复到其最新状态。

type AggregateBase struct {
    ID                string
    Version           int64
    AppliedEvents     []Event
    UncommittedEvents []Event
    Type              AggregateType
    when              when
}

func (a *AggregateBase) Apply(event Event) error {
    if event.GetAggregateID() != a.GetID() {
        return ErrInvalidAggregateID
    }

    event.SetAggregateType(a.GetType())

    if err := a.when(event); err != nil {
        return err
    }

    a.Version++
    event.SetVersion(a.GetVersion())
    a.UncommittedEvents = append(a.UncommittedEvents, event)
    return nil
}

// RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (a *AggregateBase) RaiseEvent(event Event) error {
    if event.GetAggregateID() != a.GetID() {
        return ErrInvalidAggregateID
    }
    if a.GetVersion() >= event.GetVersion() {
        return ErrInvalidEventVersion
    }

    event.SetAggregateType(a.GetType())

    if err := a.when(event); err != nil {
        return err
    }

    a.Version = event.GetVersion()
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Event结构体是esdb.RecordedEventesdb.EventData下的抽象以便于操作。 事件代表领域中发生的事实。它们是事实的来源;您的当前状态源于这些事件。 它们是不可变的,代表业务事实。 这意味着我们永远不会更改或删除数据库中的任何内容,我们只会添加新事件。



// EventType is the type of any event, used as its unique identifier.
type EventType string

// Event is an internal representation of an event, returned when the Aggregate
// uses NewEvent to create a new event. The events loaded from the db is
// represented by each DBs internal event type, implementing Event.
type Event struct {
    EventID       string
    EventType     string
    Data          []byte
    Timestamp     time.Time
    AggregateType AggregateType
    AggregateID   string
    Version       int64
    Metadata      []byte
}
Enter fullscreen mode Exit fullscreen mode

在这个例子中,我们不使用快照,因此AggregateStore接口:

// AggregateStore is responsible for loading and saving aggregates.
type AggregateStore interface {
    Load(ctx context.Context, aggregate Aggregate) error
    Save(ctx context.Context, aggregate Aggregate) error
    Exists(ctx context.Context, streamID string) error
}
Enter fullscreen mode Exit fullscreen mode

AggregateStore 的实现包括 Load、Save 和 Exists 方法。Load
和 Save 方法接受聚合数据,然后使用 EventStoreDB 客户端加载或应用事件。Load方法
找出聚合数据的流名称,从聚合数据流中读取所有事件, 循环遍历所有事件,并为每个事件调用 RaiseEvent 处理程序。Save 方法通过保存更改历史记录来持久化聚合数据,处理并发性。当您从 EventStoreDB 检索流时,您会记录当前的版本号,然后在将其保存回来时,您可以确定是否有其他人在此期间修改了该记录。


func (a *aggregateStore) Load(ctx context.Context, aggregate es.Aggregate) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "aggregateStore.Load")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", aggregate.GetID()))

    stream, err := a.db.ReadStream(ctx, aggregate.GetID(), esdb.ReadStreamOptions{}, count)
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "db.ReadStream")
    }
    defer stream.Close()

    for {
        event, err := stream.Recv()
        if errors.Is(err, esdb.ErrStreamNotFound) {
            tracing.TraceErr(span, err)
            return errors.Wrap(err, "stream.Recv")
        }
        if errors.Is(err, io.EOF) {
            break
        }
        if err != nil {
            tracing.TraceErr(span, err)
            return errors.Wrap(err, "stream.Recv")
        }

        esEvent := es.NewEventFromRecorded(event.Event)
        if err := aggregate.RaiseEvent(esEvent); err != nil {
            tracing.TraceErr(span, err)
            return errors.Wrap(err, "RaiseEvent")
        }
        a.log.Debugf("(Load) esEvent: {%s}", esEvent.String())
    }

    a.log.Debugf("(Load) aggregate: {%s}", aggregate.String())
    return nil
}

func (a *aggregateStore) Save(ctx context.Context, aggregate es.Aggregate) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "aggregateStore.Save")
    defer span.Finish()
    span.LogFields(log.String("aggregate", aggregate.String()))

    if len(aggregate.GetUncommittedEvents()) == 0 {
        a.log.Debugf("(Save) [no uncommittedEvents] len: {%d}", len(aggregate.GetUncommittedEvents()))
        return nil
    }

    eventsData := make([]esdb.EventData, 0, len(aggregate.GetUncommittedEvents()))
    for _, event := range aggregate.GetUncommittedEvents() {
        eventsData = append(eventsData, event.ToEventData())
    }

    var expectedRevision esdb.ExpectedRevision
    if len(aggregate.GetAppliedEvents()) == 0 {
        expectedRevision = esdb.NoStream{}
        a.log.Debugf("(Save) expectedRevision: {%T}", expectedRevision)

        appendStream, err := a.db.AppendToStream(
            ctx,
            aggregate.GetID(),
            esdb.AppendToStreamOptions{ExpectedRevision: expectedRevision},
            eventsData...,
        )
        if err != nil {
            tracing.TraceErr(span, err)
            return errors.Wrap(err, "db.AppendToStream")
        }

        a.log.Debugf("(Save) stream: {%+v}", appendStream)
        return nil
    }

    readOps := esdb.ReadStreamOptions{Direction: esdb.Backwards, From: esdb.End{}}
    stream, err := a.db.ReadStream(context.Background(), aggregate.GetID(), readOps, 1)
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "db.ReadStream")
    }
    defer stream.Close()

    lastEvent, err := stream.Recv()
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "stream.Recv")
    }

    expectedRevision = esdb.Revision(lastEvent.OriginalEvent().EventNumber)
    a.log.Debugf("(Save) expectedRevision: {%T}", expectedRevision)

    appendStream, err := a.db.AppendToStream(
        ctx,
        aggregate.GetID(),
        esdb.AppendToStreamOptions{ExpectedRevision: expectedRevision},
        eventsData...,
    )
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "db.AppendToStream")
    }

    a.log.Debugf("(Save) stream: {%+v}", appendStream)
    aggregate.ClearUncommittedEvents()
    return nil
}
Enter fullscreen mode Exit fullscreen mode

下一步让我们创建一个订单聚合,我们必须添加 es.AggregateBase 结构并实现 When 接口
::

type OrderAggregate struct {
    *es.AggregateBase
    Order *models.Order
}

func (a *OrderAggregate) When(evt es.Event) error {

    switch evt.GetEventType() {

    case v1.OrderCreated:
        return a.onOrderCreated(evt)
    case v1.OrderPaid:
        return a.onOrderPaid(evt)
    case v1.OrderSubmitted:
        return a.onOrderSubmitted(evt)
    case v1.OrderCompleted:
        return a.onOrderCompleted(evt)
    case v1.OrderCanceled:
        return a.onOrderCanceled(evt)
    case v1.ShoppingCartUpdated:
        return a.onShoppingCartUpdated(evt)
    case v1.DeliveryAddressChanged:
        return a.onChangeDeliveryAddress(evt)
    default:
        return es.ErrInvalidEventType
    }
}


func (a *OrderAggregate) onOrderCreated(evt es.Event) error {
    var eventData v1.OrderCreatedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.AccountEmail = eventData.AccountEmail
    a.Order.ShopItems = eventData.ShopItems
    a.Order.Created = true
    a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
    a.Order.DeliveryAddress = eventData.DeliveryAddress
    return nil
}

func (a *OrderAggregate) onOrderPaid(evt es.Event) error {
    var payment models.Payment
    if err := evt.GetJsonData(&payment); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.Paid = true
    a.Order.Payment = payment
    return nil
}

func (a *OrderAggregate) onOrderSubmitted(evt es.Event) error {
    a.Order.Submitted = true
    return nil
}

func (a *OrderAggregate) onOrderCompleted(evt es.Event) error {
    var eventData v1.OrderCompletedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.Completed = true
    a.Order.DeliveredTime = eventData.DeliveryTimestamp
    a.Order.Canceled = false
    return nil
}

func (a *OrderAggregate) onOrderCanceled(evt es.Event) error {
    var eventData v1.OrderCanceledEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.Canceled = true
    a.Order.Completed = false
    a.Order.CancelReason = eventData.CancelReason
    return nil
}

func (a *OrderAggregate) onShoppingCartUpdated(evt es.Event) error {
    var eventData v1.ShoppingCartUpdatedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.ShopItems = eventData.ShopItems
    a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
    return nil
}

func (a *OrderAggregate) onChangeDeliveryAddress(evt es.Event) error {
    var eventData v1.OrderDeliveryAddressChangedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.DeliveryAddress = eventData.DeliveryAddress
    return nil
}
Enter fullscreen mode Exit fullscreen mode

例如,让我们看一下创建订单案例、onCreateOrderCommand处理命令、验证订单状态、序列化数据和创建CreateOrderEvent

func (a *OrderAggregate) CreateOrder(ctx context.Context, shopItems []*models.ShopItem, accountEmail, deliveryAddress string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "OrderAggregate.CreateOrder")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if a.Order.Created {
        return ErrAlreadyCreated
    }
    if shopItems == nil {
        return ErrOrderShopItemsIsRequired
    }
    if deliveryAddress == "" {
        return ErrInvalidDeliveryAddress
    }

    event, err := eventsV1.NewOrderCreatedEvent(a, shopItems, accountEmail, deliveryAddress)
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "NewOrderCreatedEvent")
    }

    if err := event.SetMetadata(tracing.ExtractTextMapCarrier(span.Context())); err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "SetMetadata")
    }

    return a.Apply(event)
}
Enter fullscreen mode Exit fullscreen mode

然后使用onOrderCreated方法聚合处理事件,该方法仅将更改应用于状态:

func (a *OrderAggregate) onOrderCreated(evt es.Event) error {
    var eventData v1.OrderCreatedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.AccountEmail = eventData.AccountEmail
    a.Order.ShopItems = eventData.ShopItems
    a.Order.Created = true
    a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
    a.Order.DeliveryAddress = eventData.DeliveryAddress
    return nil
}
Enter fullscreen mode Exit fullscreen mode

我们的微服务接受 http 或 gRPC 请求: 对于 swagger 使用swag让我们看看创建订单处理程序代码:
昂首阔步

// CreateOrder
// @Tags Orders
// @Summary Create order
// @Description Create new order
// @Param order body dto.CreateOrderReqDto true "create order"
// @Accept json
// @Produce json
// @Success 201 {string} id ""
// @Router /orders [post]
func (h *orderHandlers) CreateOrder() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "orderHandlers.CreateOrder")
        defer span.Finish()
        h.metrics.CreateOrderHttpRequests.Inc()

        var reqDto dto.CreateOrderReqDto
        if err := c.Bind(&reqDto); err != nil {
            h.log.Errorf("(Bind) err: {%v}", err)
            tracing.TraceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        if err := h.v.StructCtx(ctx, reqDto); err != nil {
            h.log.Errorf("(validate) err: {%v}", err)
            tracing.TraceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        id := uuid.NewV4().String()
        command := v1.NewCreateOrderCommand(id, reqDto.ShopItems, reqDto.AccountEmail, reqDto.DeliveryAddress)
        err := h.os.Commands.CreateOrder.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(CreateOrder.Handle) id: {%s}, err: {%v}", id, err)
            tracing.TraceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(order created) id: {%s}", id)
        return c.JSON(http.StatusCreated, id)
    }
}
Enter fullscreen mode Exit fullscreen mode

gRPC CreateOrder处理程序与 http 处理程序功能相同,验证请求并调用命令。
为了进行验证,我们使用了验证器,因为它基于标签实现了结构体和各个字段的值验证。
bloomrpc

func (s *orderGrpcService) CreateOrder(ctx context.Context, req *orderService.CreateOrderReq) (*orderService.CreateOrderRes, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "orderGrpcService.CreateOrder")
    defer span.Finish()
    span.LogFields(log.String("req", req.String()))
    s.metrics.CreateOrderGrpcRequests.Inc()

    aggregateID := uuid.NewV4().String()
    command := v1.NewCreateOrderCommand(aggregateID, models.ShopItemsFromProto(req.GetShopItems()), req.GetAccountEmail(), req.GetDeliveryAddress())
    if err := s.v.StructCtx(ctx, command); err != nil {
        s.log.Errorf("(validate) aggregateID: {%s}, err: {%v}", aggregateID, err)
        tracing.TraceErr(span, err)
        return nil, s.errResponse(err)
    }

    if err := s.os.Commands.CreateOrder.Handle(ctx, command); err != nil {
        s.log.Errorf("(CreateOrder.Handle) orderID: {%s}, err: {%v}", aggregateID, err)
        return nil, s.errResponse(err)
    }

    s.log.Infof("(created order): orderID: {%s}", aggregateID)
    return &orderService.CreateOrderRes{AggregateID: aggregateID}, nil
}
Enter fullscreen mode Exit fullscreen mode

http 和 gRPC 处理程序执行相同的操作,验证传入的请求并使用CreateOrder 命令调用命令服务,
该命令服务加载 OrderAggregate,调用HandleCommand方法并将其保存到事件存储。CQRS
之所以流行,主要原因是它能够分别处理读取和写入操作,因为针对这些截然不同的操作,优化技术存在很大差异。

func (c *createOrderHandler) Handle(ctx context.Context, command *CreateOrderCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createOrderHandler.Handle")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", command.GetAggregateID()))

    order := aggregate.NewOrderAggregateWithID(command.AggregateID)
    err := c.es.Exists(ctx, order.GetID())
    if err != nil && !errors.Is(err, esdb.ErrStreamNotFound) {
        return err
    }

    if err := order.CreateOrder(ctx, command.ShopItems, command.AccountEmail, command.DeliveryAddress); err != nil {
        return err
    }

    span.LogFields(log.String("order", order.String()))
    return c.es.Save(ctx, order)
}
Enter fullscreen mode Exit fullscreen mode

从事件构建状态的过程称为投影。EventStoreDB
具有订阅功能,因此我们可以订阅订单类型流事件的投影。
当我们执行命令时,聚合会生成一个代表聚合状态转换的新事件。这些事件已提交到存储,因此存储将它们附加到聚合流的末尾。订阅接收这些事件并更新其读取模型。MongoDB
的投影使用持久订阅来订阅事件流。
并使用 When 方法处理事件,就像聚合一样,它会根据事件类型应用更改:

func (o *mongoProjection) ProcessEvents(ctx context.Context, stream *esdb.PersistentSubscription, workerID int) error {

    for {
        event := stream.Recv()
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        if event.SubscriptionDropped != nil {
            o.log.Errorf("(SubscriptionDropped) err: {%v}", event.SubscriptionDropped.Error)
            return errors.Wrap(event.SubscriptionDropped.Error, "Subscription Dropped")
        }

        if event.EventAppeared != nil {
            o.log.ProjectionEvent(constants.MongoProjection, o.cfg.Subscriptions.MongoProjectionGroupName, event.EventAppeared, workerID)

            err := o.When(ctx, es.NewEventFromRecorded(event.EventAppeared.Event))
            if err != nil {
                o.log.Errorf("(mongoProjection.when) err: {%v}", err)
                if err := stream.Nack(err.Error(), esdb.Nack_Retry, event.EventAppeared); err != nil {
                    o.log.Errorf("(stream.Nack) err: {%v}", err)
                    return errors.Wrap(err, "stream.Nack")
                }
            }
            err = stream.Ack(event.EventAppeared)
            if err != nil {
                o.log.Errorf("(stream.Ack) err: {%v}", err)
                return errors.Wrap(err, "stream.Ack")
            }
            o.log.Infof("(ACK) event commit: {%v}", *event.EventAppeared.Commit)
        }
    }
}

func (o *mongoProjection) When(ctx context.Context, evt es.Event) error {
    ctx, span := tracing.StartProjectionTracerSpan(ctx, "mongoProjection.When", evt)
    defer span.Finish()
    span.LogFields(log.String("AggregateID", evt.GetAggregateID()), log.String("EventType", evt.GetEventType()))

    switch evt.GetEventType() {

    case v1.OrderCreated:
        return o.onOrderCreate(ctx, evt)
    case v1.OrderPaid:
        return o.onOrderPaid(ctx, evt)
    case v1.OrderSubmitted:
        return o.onSubmit(ctx, evt)
    case v1.ShoppingCartUpdated:
        return o.onShoppingCartUpdate(ctx, evt)
    case v1.OrderCanceled:
        return o.onCancel(ctx, evt)
    case v1.OrderCompleted:
        return o.onCompleted(ctx, evt)
    case v1.DeliveryAddressUpdated:
        return o.onDeliveryAddressUpdated(ctx, evt)

    default:
        o.log.Warnf("(mongoProjection) [When unknown EventType] eventType: {%s}", evt.EventType)
        return es.ErrInvalidEventType
    }
}
Enter fullscreen mode Exit fullscreen mode

onOrderCreate方法反序列化数据并处理调用 MongoDB 存储库插入方法的事件:

func (o *mongoProjection) onOrderCreate(ctx context.Context, evt es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoProjection.onOrderCreate")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", evt.GetAggregateID()))

    var eventData v1.OrderCreatedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "evt.GetJsonData")
    }
    span.LogFields(log.String("AccountEmail", eventData.AccountEmail))

    op := &models.OrderProjection{
        OrderID:         aggregate.GetOrderAggregateID(evt.AggregateID),
        ShopItems:       eventData.ShopItems,
        Created:         true,
        AccountEmail:    eventData.AccountEmail,
        TotalPrice:      aggregate.GetShopItemsTotalPrice(eventData.ShopItems),
        DeliveryAddress: eventData.DeliveryAddress,
    }

    _, err := o.mongoRepo.Insert(ctx, op)
    if err != nil {
        return err
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

OrderMongoRepository 插入方法很简单:

func (m *mongoRepository) Insert(ctx context.Context, order *models.OrderProjection) (string, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Insert")
    defer span.Finish()
    span.LogFields(log.String("OrderID", order.OrderID))

    _, err := m.getOrdersCollection().InsertOne(ctx, order, &options.InsertOneOptions{})
    if err != nil {
        tracing.TraceErr(span, err)
        return "", err
    }

    return order.OrderID, nil
}
Enter fullscreen mode Exit fullscreen mode


ElasticSearch 投影实现的思路相同,我们可以用它来搜索订单,所以让我们看一下存储库的 Search 方法。
在本项目中,我们使用elastic作为 Elastic Search 客户端。

func (e *elasticRepository) Search(ctx context.Context, text string, pq *utils.Pagination) (*dto.OrderSearchResponseDto, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepository.Search")
    defer span.Finish()
    span.LogFields(log.String("Search", text))

    shouldMatch := v7.NewBoolQuery().
        Should(v7.NewMatchPhrasePrefixQuery(shopItemTitle, text), v7.NewMatchPhrasePrefixQuery(shopItemDescription, text)).
        MinimumNumberShouldMatch(minimumNumberShouldMatch)

    searchResult, err := e.elasticClient.Search(e.cfg.ElasticIndexes.Orders).
        Query(shouldMatch).
        From(pq.GetOffset()).
        Explain(e.cfg.Elastic.Explain).
        FetchSource(e.cfg.Elastic.FetchSource).
        Version(e.cfg.Elastic.Version).
        Size(pq.GetSize()).
        Pretty(e.cfg.Elastic.Pretty).
        Do(ctx)
    if err != nil {
        tracing.TraceErr(span, err)
        return nil, errors.Wrap(err, "elasticClient.Search")
    }

    orders := make([]*models.OrderProjection, 0, len(searchResult.Hits.Hits))
    for _, hit := range searchResult.Hits.Hits {
        jsonBytes, err := hit.Source.MarshalJSON()
        if err != nil {
            tracing.TraceErr(span, err)
            return nil, errors.Wrap(err, "Source.MarshalJSON")
        }
        var order models.OrderProjection
        if err := json.Unmarshal(jsonBytes, &order); err != nil {
            tracing.TraceErr(span, err)
            return nil, errors.Wrap(err, "json.Unmarshal")
        }
        orders = append(orders, &order)
    }

    return &dto.OrderSearchResponseDto{
        Pagination: dto.Pagination{
            TotalCount: searchResult.TotalHits(),
            TotalPages: int64(pq.GetTotalPages(int(searchResult.TotalHits()))),
            Page:       int64(pq.GetPage()),
            Size:       int64(pq.GetSize()),
            HasMore:    pq.GetHasMore(int(searchResult.TotalHits())),
        },
        Orders: mappers.OrdersFromProjections(orders),
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

您可以在此处找到更多详细信息和完整项目的源代码。当然
在实际应用中,我们还需要实现更多必要的功能,例如断路器、重试、速率限制器等。
根据项目的不同,这些功能可以通过不同的方式实现,例如,您可以使用 Kubernetes 和 Istio 来实现其中一些功能。
希望本文对您有所帮助,并乐意收到任何反馈或问题,欢迎随时通过电子邮件或任何 Messenger 联系我 :)

文章来源:https://dev.to/aleksk1ng/go-eventsource-and-cqrs-microservice-using-eventstoredb-5djo
PREV
Go、Kafka 和 gRPC 清洁架构 CQRS 微服务与 Jaeger 跟踪👋🧑‍💻
NEXT
如何检测 React 中已加载的图像