使用 EventStoreDB 的 Go EventSourcing 和 CQRS 微服务👋⚡️💫
在本文中,让我们尝试使用以下工具创建更接近现实世界的事件源 CQRS 微服务:🚀👨💻🙌
EventStoreDB为事件源构建的数据库
gRPC gRPC 的Go 实现
MongoDB基于 Web 和 API 的 SMTP 测试
Elasticsearch Go 的 Elasticsearch 客户端。Jaeger
开源端到端分布式跟踪
Prometheus监控和警报
Grafana用于使用 Prometheus
swag Swagger for Go
Echo Web 框架
Kibana构建可观察性仪表板Kibana 是一个用户界面,可让您可视化您的 Elasticsearch
源代码可以在GitHub 仓库中找到。
该项目的核心思想是使用 Go、EventStoreDB、gRPC、MongoDB 和 ElasticSearch 实现事件溯源和 CQRS。
本文没有详细讨论事件溯源和 CQRS 模式,因为这会使文章篇幅过长。最佳阅读地点是 microservices.io ( https://microservices.io/patterns/data/cqrs.html ),发现这篇文章也很不错,强烈推荐Alexey Zimarev的 《Hands-on Domain-Driven Design with .NET Core》一书和他的博客。
在这个项目中,我们使用官方go 客户端与EventStoreDB协同工作,对于 [projections(https://zimarev.com/blog/event-sourcing/projections/)使用MongoDB和Elasticsearch进行搜索, 并通过 gRPC 和 REST 进行通信。 这里没有实现任何有趣的业务逻辑,也没有涵盖测试,因为没有足够的时间, 事件列表非常简单:创建新订单、更新购物车、支付、提交、取消、更改送货地址、完成订单,当然在现实世界中最好使用更具体、更有意义的事件,但这里的目标是展示这个想法以及它是如何工作的。 事件源可以通过不同的方式实现,这里使用 EventStoreDB,但我们可以使用 PostgreSQL 和 Kafka 来实现。 在尝试了这两种方法之后,发现 EventStoreDB 是一个更好的解决方案,因为所有必需的功能都是开箱即用的,它经过了优化,并且确实有非常优秀的工程师在开发它。
对于本地开发:
make local or docker_dev // for run docker compose
make run_es // run es microservice
或者
make dev // run all in docker compose with hot reload
所有 UI 界面将在以下端口可用:
EventStoreDB 用户界面:http://localhost:2113
Jaeger 用户界面:http://localhost:16686
Prometheus 用户界面:http://localhost:9090
Grafana 用户界面:http://localhost:3005
Swagger 用户界面:http://localhost:5007/swagger/index.html
Kibana 用户界面:http://localhost:5601/app/home#/
该项目的 Docker 撰写文件,
根据您的环境,使用 eventstoredb 的图像:eventstore/eventstore:21.6.0-buster-slim,
以及 M1 的图像:ghcr.io/eventstore/eventstore@sha256:ab30bf2a2629e2c8710f8f7fdcb74df5466c6b3b2200c8e9ad8a797ed138012a
version: "3.8"
services:
eventstore.db:
image: eventstore/eventstore:21.6.0-buster-slim
environment:
- EVENTSTORE_CLUSTER_SIZE=1
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_START_STANDARD_PROJECTIONS=true
- EVENTSTORE_EXT_TCP_PORT=1113
- EVENTSTORE_HTTP_PORT=2113
- EVENTSTORE_INSECURE=true
- EVENTSTORE_ENABLE_EXTERNAL_TCP=true
- EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
ports:
- "1113:1113"
- "2113:2113"
volumes:
- type: volume
source: eventstore-volume-data
target: /var/lib/eventstore
- type: volume
source: eventstore-volume-logs
target: /var/log/eventstore
networks: [ "microservices" ]
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3005:3000'
networks: [ "microservices" ]
mongodb:
image: mongo:latest
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: products
ports:
- "27017:27017"
volumes:
- mongodb_data_container:/data/db
networks: [ "microservices" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "14250:14250"
- "9411:9411"
networks: [ "microservices" ]
node01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.1
container_name: node01
restart: always
environment:
- node.name=node01
- cluster.name=es-cluster-7
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms128m -Xmx128m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es-data01:/usr/share/elasticsearch/data
ports:
- "9200:9200"
networks: [ "microservices" ]
kibana:
image: docker.elastic.co/kibana/kibana:7.11.1
restart: always
environment:
ELASTICSEARCH_HOSTS: http://node01:9200
ports:
- "5601:5601"
depends_on:
- node01
networks: [ "microservices" ]
volumes:
mongodb_data_container:
eventstore-volume-data:
eventstore-volume-logs:
es-data01:
driver: local
networks:
microservices:
name: microservices
首先,我们查看 EventStoreDB,然后添加一些使用它的代码,我们可以查看流和事件
EventStoreDB ui:
AggregateRoot 可以通过多种方式实现,就本项目以及 Go 语言而言,主要的聚合根方法包括:加载事件和应用更改。
当我们从数据库中获取聚合时,我们不是将其状态作为表或文档中的一条记录读取,而是读取之前保存的所有事件,并为每个事件调用When方法。
完成所有这些步骤后,我们将恢复给定聚合的所有历史记录。通过这样做,我们将聚合恢复到其最新状态。
type AggregateBase struct {
ID string
Version int64
AppliedEvents []Event
UncommittedEvents []Event
Type AggregateType
when when
}
func (a *AggregateBase) Apply(event Event) error {
if event.GetAggregateID() != a.GetID() {
return ErrInvalidAggregateID
}
event.SetAggregateType(a.GetType())
if err := a.when(event); err != nil {
return err
}
a.Version++
event.SetVersion(a.GetVersion())
a.UncommittedEvents = append(a.UncommittedEvents, event)
return nil
}
// RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (a *AggregateBase) RaiseEvent(event Event) error {
if event.GetAggregateID() != a.GetID() {
return ErrInvalidAggregateID
}
if a.GetVersion() >= event.GetVersion() {
return ErrInvalidEventVersion
}
event.SetAggregateType(a.GetType())
if err := a.when(event); err != nil {
return err
}
a.Version = event.GetVersion()
return nil
}
Event结构体是esdb.RecordedEvent和esdb.EventData下的抽象,以便于操作。 事件代表领域中发生的事实。它们是事实的来源;您的当前状态源于这些事件。 它们是不可变的,代表业务事实。 这意味着我们永远不会更改或删除数据库中的任何内容,我们只会添加新事件。
// EventType is the type of any event, used as its unique identifier.
type EventType string
// Event is an internal representation of an event, returned when the Aggregate
// uses NewEvent to create a new event. The events loaded from the db is
// represented by each DBs internal event type, implementing Event.
type Event struct {
EventID string
EventType string
Data []byte
Timestamp time.Time
AggregateType AggregateType
AggregateID string
Version int64
Metadata []byte
}
在这个例子中,我们不使用快照,因此AggregateStore接口:
// AggregateStore is responsible for loading and saving aggregates.
type AggregateStore interface {
Load(ctx context.Context, aggregate Aggregate) error
Save(ctx context.Context, aggregate Aggregate) error
Exists(ctx context.Context, streamID string) error
}
AggregateStore 的实现包括 Load、Save 和 Exists 方法。Load
和 Save 方法接受聚合数据,然后使用 EventStoreDB 客户端加载或应用事件。Load方法
:找出聚合数据的流名称,从聚合数据流中读取所有事件, 循环遍历所有事件,并为每个事件调用 RaiseEvent 处理程序。Save 方法通过保存更改历史记录来持久化聚合数据,处理并发性。当您从 EventStoreDB 检索流时,您会记录当前的版本号,然后在将其保存回来时,您可以确定是否有其他人在此期间修改了该记录。
func (a *aggregateStore) Load(ctx context.Context, aggregate es.Aggregate) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "aggregateStore.Load")
defer span.Finish()
span.LogFields(log.String("AggregateID", aggregate.GetID()))
stream, err := a.db.ReadStream(ctx, aggregate.GetID(), esdb.ReadStreamOptions{}, count)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "db.ReadStream")
}
defer stream.Close()
for {
event, err := stream.Recv()
if errors.Is(err, esdb.ErrStreamNotFound) {
tracing.TraceErr(span, err)
return errors.Wrap(err, "stream.Recv")
}
if errors.Is(err, io.EOF) {
break
}
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "stream.Recv")
}
esEvent := es.NewEventFromRecorded(event.Event)
if err := aggregate.RaiseEvent(esEvent); err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "RaiseEvent")
}
a.log.Debugf("(Load) esEvent: {%s}", esEvent.String())
}
a.log.Debugf("(Load) aggregate: {%s}", aggregate.String())
return nil
}
func (a *aggregateStore) Save(ctx context.Context, aggregate es.Aggregate) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "aggregateStore.Save")
defer span.Finish()
span.LogFields(log.String("aggregate", aggregate.String()))
if len(aggregate.GetUncommittedEvents()) == 0 {
a.log.Debugf("(Save) [no uncommittedEvents] len: {%d}", len(aggregate.GetUncommittedEvents()))
return nil
}
eventsData := make([]esdb.EventData, 0, len(aggregate.GetUncommittedEvents()))
for _, event := range aggregate.GetUncommittedEvents() {
eventsData = append(eventsData, event.ToEventData())
}
var expectedRevision esdb.ExpectedRevision
if len(aggregate.GetAppliedEvents()) == 0 {
expectedRevision = esdb.NoStream{}
a.log.Debugf("(Save) expectedRevision: {%T}", expectedRevision)
appendStream, err := a.db.AppendToStream(
ctx,
aggregate.GetID(),
esdb.AppendToStreamOptions{ExpectedRevision: expectedRevision},
eventsData...,
)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "db.AppendToStream")
}
a.log.Debugf("(Save) stream: {%+v}", appendStream)
return nil
}
readOps := esdb.ReadStreamOptions{Direction: esdb.Backwards, From: esdb.End{}}
stream, err := a.db.ReadStream(context.Background(), aggregate.GetID(), readOps, 1)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "db.ReadStream")
}
defer stream.Close()
lastEvent, err := stream.Recv()
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "stream.Recv")
}
expectedRevision = esdb.Revision(lastEvent.OriginalEvent().EventNumber)
a.log.Debugf("(Save) expectedRevision: {%T}", expectedRevision)
appendStream, err := a.db.AppendToStream(
ctx,
aggregate.GetID(),
esdb.AppendToStreamOptions{ExpectedRevision: expectedRevision},
eventsData...,
)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "db.AppendToStream")
}
a.log.Debugf("(Save) stream: {%+v}", appendStream)
aggregate.ClearUncommittedEvents()
return nil
}
下一步让我们创建一个订单聚合,我们必须添加 es.AggregateBase 结构并实现 When 接口
::
type OrderAggregate struct {
*es.AggregateBase
Order *models.Order
}
func (a *OrderAggregate) When(evt es.Event) error {
switch evt.GetEventType() {
case v1.OrderCreated:
return a.onOrderCreated(evt)
case v1.OrderPaid:
return a.onOrderPaid(evt)
case v1.OrderSubmitted:
return a.onOrderSubmitted(evt)
case v1.OrderCompleted:
return a.onOrderCompleted(evt)
case v1.OrderCanceled:
return a.onOrderCanceled(evt)
case v1.ShoppingCartUpdated:
return a.onShoppingCartUpdated(evt)
case v1.DeliveryAddressChanged:
return a.onChangeDeliveryAddress(evt)
default:
return es.ErrInvalidEventType
}
}
func (a *OrderAggregate) onOrderCreated(evt es.Event) error {
var eventData v1.OrderCreatedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
}
a.Order.AccountEmail = eventData.AccountEmail
a.Order.ShopItems = eventData.ShopItems
a.Order.Created = true
a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
a.Order.DeliveryAddress = eventData.DeliveryAddress
return nil
}
func (a *OrderAggregate) onOrderPaid(evt es.Event) error {
var payment models.Payment
if err := evt.GetJsonData(&payment); err != nil {
return errors.Wrap(err, "GetJsonData")
}
a.Order.Paid = true
a.Order.Payment = payment
return nil
}
func (a *OrderAggregate) onOrderSubmitted(evt es.Event) error {
a.Order.Submitted = true
return nil
}
func (a *OrderAggregate) onOrderCompleted(evt es.Event) error {
var eventData v1.OrderCompletedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
}
a.Order.Completed = true
a.Order.DeliveredTime = eventData.DeliveryTimestamp
a.Order.Canceled = false
return nil
}
func (a *OrderAggregate) onOrderCanceled(evt es.Event) error {
var eventData v1.OrderCanceledEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
}
a.Order.Canceled = true
a.Order.Completed = false
a.Order.CancelReason = eventData.CancelReason
return nil
}
func (a *OrderAggregate) onShoppingCartUpdated(evt es.Event) error {
var eventData v1.ShoppingCartUpdatedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
}
a.Order.ShopItems = eventData.ShopItems
a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
return nil
}
func (a *OrderAggregate) onChangeDeliveryAddress(evt es.Event) error {
var eventData v1.OrderDeliveryAddressChangedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
}
a.Order.DeliveryAddress = eventData.DeliveryAddress
return nil
}
例如,让我们看一下创建订单案例、onCreateOrderCommand处理命令、验证订单状态、序列化数据和创建CreateOrderEvent:
func (a *OrderAggregate) CreateOrder(ctx context.Context, shopItems []*models.ShopItem, accountEmail, deliveryAddress string) error {
span, _ := opentracing.StartSpanFromContext(ctx, "OrderAggregate.CreateOrder")
defer span.Finish()
span.LogFields(log.String("AggregateID", a.GetID()))
if a.Order.Created {
return ErrAlreadyCreated
}
if shopItems == nil {
return ErrOrderShopItemsIsRequired
}
if deliveryAddress == "" {
return ErrInvalidDeliveryAddress
}
event, err := eventsV1.NewOrderCreatedEvent(a, shopItems, accountEmail, deliveryAddress)
if err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "NewOrderCreatedEvent")
}
if err := event.SetMetadata(tracing.ExtractTextMapCarrier(span.Context())); err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "SetMetadata")
}
return a.Apply(event)
}
然后使用onOrderCreated方法聚合处理事件,该方法仅将更改应用于状态:
func (a *OrderAggregate) onOrderCreated(evt es.Event) error {
var eventData v1.OrderCreatedEvent
if err := evt.GetJsonData(&eventData); err != nil {
return errors.Wrap(err, "GetJsonData")
}
a.Order.AccountEmail = eventData.AccountEmail
a.Order.ShopItems = eventData.ShopItems
a.Order.Created = true
a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
a.Order.DeliveryAddress = eventData.DeliveryAddress
return nil
}
我们的微服务接受 http 或 gRPC 请求: 对于 swagger 使用swag让我们看看创建订单处理程序代码:
// CreateOrder
// @Tags Orders
// @Summary Create order
// @Description Create new order
// @Param order body dto.CreateOrderReqDto true "create order"
// @Accept json
// @Produce json
// @Success 201 {string} id ""
// @Router /orders [post]
func (h *orderHandlers) CreateOrder() echo.HandlerFunc {
return func(c echo.Context) error {
ctx, span := tracing.StartHttpServerTracerSpan(c, "orderHandlers.CreateOrder")
defer span.Finish()
h.metrics.CreateOrderHttpRequests.Inc()
var reqDto dto.CreateOrderReqDto
if err := c.Bind(&reqDto); err != nil {
h.log.Errorf("(Bind) err: {%v}", err)
tracing.TraceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
if err := h.v.StructCtx(ctx, reqDto); err != nil {
h.log.Errorf("(validate) err: {%v}", err)
tracing.TraceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
id := uuid.NewV4().String()
command := v1.NewCreateOrderCommand(id, reqDto.ShopItems, reqDto.AccountEmail, reqDto.DeliveryAddress)
err := h.os.Commands.CreateOrder.Handle(ctx, command)
if err != nil {
h.log.Errorf("(CreateOrder.Handle) id: {%s}, err: {%v}", id, err)
tracing.TraceErr(span, err)
return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
}
h.log.Infof("(order created) id: {%s}", id)
return c.JSON(http.StatusCreated, id)
}
}
gRPC CreateOrder处理程序与 http 处理程序功能相同,验证请求并调用命令。
为了进行验证,我们使用了验证器,因为它基于标签实现了结构体和各个字段的值验证。
func (s *orderGrpcService) CreateOrder(ctx context.Context, req *orderService.CreateOrderReq) (*orderService.CreateOrderRes, error) {
ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "orderGrpcService.CreateOrder")
defer span.Finish()
span.LogFields(log.String("req", req.String()))
s.metrics.CreateOrderGrpcRequests.Inc()
aggregateID := uuid.NewV4().String()
command := v1.NewCreateOrderCommand(aggregateID, models.ShopItemsFromProto(req.GetShopItems()), req.GetAccountEmail(), req.GetDeliveryAddress())
if err := s.v.StructCtx(ctx, command); err != nil {
s.log.Errorf("(validate) aggregateID: {%s}, err: {%v}", aggregateID, err)
tracing.TraceErr(span, err)
return nil, s.errResponse(err)
}
if err := s.os.Commands.CreateOrder.Handle(ctx, command); err != nil {
s.log.Errorf("(CreateOrder.Handle) orderID: {%s}, err: {%v}", aggregateID, err)
return nil, s.errResponse(err)
}
s.log.Infof("(created order): orderID: {%s}", aggregateID)
return &orderService.CreateOrderRes{AggregateID: aggregateID}, nil
}
http 和 gRPC 处理程序执行相同的操作,验证传入的请求并使用CreateOrder 命令调用命令服务,
该命令服务加载 OrderAggregate,调用HandleCommand方法并将其保存到事件存储。CQRS
之所以流行,主要原因是它能够分别处理读取和写入操作,因为针对这些截然不同的操作,优化技术存在很大差异。
func (c *createOrderHandler) Handle(ctx context.Context, command *CreateOrderCommand) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "createOrderHandler.Handle")
defer span.Finish()
span.LogFields(log.String("AggregateID", command.GetAggregateID()))
order := aggregate.NewOrderAggregateWithID(command.AggregateID)
err := c.es.Exists(ctx, order.GetID())
if err != nil && !errors.Is(err, esdb.ErrStreamNotFound) {
return err
}
if err := order.CreateOrder(ctx, command.ShopItems, command.AccountEmail, command.DeliveryAddress); err != nil {
return err
}
span.LogFields(log.String("order", order.String()))
return c.es.Save(ctx, order)
}
从事件构建状态的过程称为投影。EventStoreDB
具有订阅功能,因此我们可以订阅订单类型流事件的投影。
当我们执行命令时,聚合会生成一个代表聚合状态转换的新事件。这些事件已提交到存储,因此存储将它们附加到聚合流的末尾。订阅接收这些事件并更新其读取模型。MongoDB
的投影使用持久订阅来订阅事件流。
并使用 When 方法处理事件,就像聚合一样,它会根据事件类型应用更改:
func (o *mongoProjection) ProcessEvents(ctx context.Context, stream *esdb.PersistentSubscription, workerID int) error {
for {
event := stream.Recv()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if event.SubscriptionDropped != nil {
o.log.Errorf("(SubscriptionDropped) err: {%v}", event.SubscriptionDropped.Error)
return errors.Wrap(event.SubscriptionDropped.Error, "Subscription Dropped")
}
if event.EventAppeared != nil {
o.log.ProjectionEvent(constants.MongoProjection, o.cfg.Subscriptions.MongoProjectionGroupName, event.EventAppeared, workerID)
err := o.When(ctx, es.NewEventFromRecorded(event.EventAppeared.Event))
if err != nil {
o.log.Errorf("(mongoProjection.when) err: {%v}", err)
if err := stream.Nack(err.Error(), esdb.Nack_Retry, event.EventAppeared); err != nil {
o.log.Errorf("(stream.Nack) err: {%v}", err)
return errors.Wrap(err, "stream.Nack")
}
}
err = stream.Ack(event.EventAppeared)
if err != nil {
o.log.Errorf("(stream.Ack) err: {%v}", err)
return errors.Wrap(err, "stream.Ack")
}
o.log.Infof("(ACK) event commit: {%v}", *event.EventAppeared.Commit)
}
}
}
func (o *mongoProjection) When(ctx context.Context, evt es.Event) error {
ctx, span := tracing.StartProjectionTracerSpan(ctx, "mongoProjection.When", evt)
defer span.Finish()
span.LogFields(log.String("AggregateID", evt.GetAggregateID()), log.String("EventType", evt.GetEventType()))
switch evt.GetEventType() {
case v1.OrderCreated:
return o.onOrderCreate(ctx, evt)
case v1.OrderPaid:
return o.onOrderPaid(ctx, evt)
case v1.OrderSubmitted:
return o.onSubmit(ctx, evt)
case v1.ShoppingCartUpdated:
return o.onShoppingCartUpdate(ctx, evt)
case v1.OrderCanceled:
return o.onCancel(ctx, evt)
case v1.OrderCompleted:
return o.onCompleted(ctx, evt)
case v1.DeliveryAddressUpdated:
return o.onDeliveryAddressUpdated(ctx, evt)
default:
o.log.Warnf("(mongoProjection) [When unknown EventType] eventType: {%s}", evt.EventType)
return es.ErrInvalidEventType
}
}
并onOrderCreate方法反序列化数据并处理调用 MongoDB 存储库插入方法的事件:
func (o *mongoProjection) onOrderCreate(ctx context.Context, evt es.Event) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoProjection.onOrderCreate")
defer span.Finish()
span.LogFields(log.String("AggregateID", evt.GetAggregateID()))
var eventData v1.OrderCreatedEvent
if err := evt.GetJsonData(&eventData); err != nil {
tracing.TraceErr(span, err)
return errors.Wrap(err, "evt.GetJsonData")
}
span.LogFields(log.String("AccountEmail", eventData.AccountEmail))
op := &models.OrderProjection{
OrderID: aggregate.GetOrderAggregateID(evt.AggregateID),
ShopItems: eventData.ShopItems,
Created: true,
AccountEmail: eventData.AccountEmail,
TotalPrice: aggregate.GetShopItemsTotalPrice(eventData.ShopItems),
DeliveryAddress: eventData.DeliveryAddress,
}
_, err := o.mongoRepo.Insert(ctx, op)
if err != nil {
return err
}
return nil
}
OrderMongoRepository 插入方法很简单:
func (m *mongoRepository) Insert(ctx context.Context, order *models.OrderProjection) (string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Insert")
defer span.Finish()
span.LogFields(log.String("OrderID", order.OrderID))
_, err := m.getOrdersCollection().InsertOne(ctx, order, &options.InsertOneOptions{})
if err != nil {
tracing.TraceErr(span, err)
return "", err
}
return order.OrderID, nil
}
ElasticSearch 投影实现的思路相同,我们可以用它来搜索订单,所以让我们看一下存储库的 Search 方法。
在本项目中,我们使用elastic作为 Elastic Search 客户端。
func (e *elasticRepository) Search(ctx context.Context, text string, pq *utils.Pagination) (*dto.OrderSearchResponseDto, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepository.Search")
defer span.Finish()
span.LogFields(log.String("Search", text))
shouldMatch := v7.NewBoolQuery().
Should(v7.NewMatchPhrasePrefixQuery(shopItemTitle, text), v7.NewMatchPhrasePrefixQuery(shopItemDescription, text)).
MinimumNumberShouldMatch(minimumNumberShouldMatch)
searchResult, err := e.elasticClient.Search(e.cfg.ElasticIndexes.Orders).
Query(shouldMatch).
From(pq.GetOffset()).
Explain(e.cfg.Elastic.Explain).
FetchSource(e.cfg.Elastic.FetchSource).
Version(e.cfg.Elastic.Version).
Size(pq.GetSize()).
Pretty(e.cfg.Elastic.Pretty).
Do(ctx)
if err != nil {
tracing.TraceErr(span, err)
return nil, errors.Wrap(err, "elasticClient.Search")
}
orders := make([]*models.OrderProjection, 0, len(searchResult.Hits.Hits))
for _, hit := range searchResult.Hits.Hits {
jsonBytes, err := hit.Source.MarshalJSON()
if err != nil {
tracing.TraceErr(span, err)
return nil, errors.Wrap(err, "Source.MarshalJSON")
}
var order models.OrderProjection
if err := json.Unmarshal(jsonBytes, &order); err != nil {
tracing.TraceErr(span, err)
return nil, errors.Wrap(err, "json.Unmarshal")
}
orders = append(orders, &order)
}
return &dto.OrderSearchResponseDto{
Pagination: dto.Pagination{
TotalCount: searchResult.TotalHits(),
TotalPages: int64(pq.GetTotalPages(int(searchResult.TotalHits()))),
Page: int64(pq.GetPage()),
Size: int64(pq.GetSize()),
HasMore: pq.GetHasMore(int(searchResult.TotalHits())),
},
Orders: mappers.OrdersFromProjections(orders),
}, nil
}
您可以在此处找到更多详细信息和完整项目的源代码。当然,
在实际应用中,我们还需要实现更多必要的功能,例如断路器、重试、速率限制器等。
根据项目的不同,这些功能可以通过不同的方式实现,例如,您可以使用 Kubernetes 和 Istio 来实现其中一些功能。
希望本文对您有所帮助,并乐意收到任何反馈或问题,欢迎随时通过电子邮件或任何 Messenger 联系我 :)