具有指标和跟踪功能的 Go、Kafka、gRPC 和 MongoDB 微服务👋
本文尝试使用以下方式实现清洁架构微服务:🚀
Kafka作为消息代理
gRPC Go 实现 gRPC
MongoDB作为数据库
Jaeger开源,端到端分布式跟踪
Prometheus监控和警报
Grafana用于使用 Prometheus 的所有内容组成可观察性仪表板
您可以在GitHub 存储库 中找到源代码,
我想重点介绍Kafka,因为这是我第一次尝试它,所以它是边做边学👨💻
当然,任何微服务都需要监控和跟踪,所以它包含在内⚡️
Kafka 是一个高吞吐量的分布式消息系统。
在任何时候,只有一个 Broker 可以成为给定分区的 Leader,并且只有该 Leader 可以接收该分区的服务数据,
其他 Broker 负责同步数据。
制作人:
生产者将数据写入主题,并自动知道要写入哪个代理和分区。
如果代理发生故障,生产者将自动恢复。
生产者可以选择接收数据写入的确认:
- acks = 0,生产者不会确认,这意味着可能丢失数据。
- acks = 1。生产者将等待领导者确认,在某些情况下也可能出现有限的数据丢失。
- acks = all。leader + replicas 确认,需要从 broker 端配置键 min.insync.replicas 中指定数量的 broker 获得确认,如果当前同步的 broker 数量少于此值,produce 将会失败。您可以将 min.insync.replicas 指定为最低 1(相当于 acks==1),也可以指定为最高副本数,或者介于两者之间,这样您就可以在可用性和一致性之间进行精细的权衡。这里有一篇好文章
生产者可以选择随消息发送密钥。
- 如果没有密钥数据则以循环方式发送。
- 如果发送了一个密钥,那么该密钥的所有消息将始终发送到同一个分区。
消费者:
消费者从主题读取数据,并知道从哪个代理读取。
每个分区内的数据按顺序读取。
消费者群体:
消费者以消费者组的形式读取数据。组内的每个消费者都从专属分区读取数据。
因此,如果消费者数量超过分区数量,某些消费者可能会处于非活跃状态。
消费者抵消:
Kafka 存储消费者组读取数据的偏移量。
当组中的某个消费者处理完数据后,它应该提交偏移量。
如果消费者宕机,它能够从中断处继续读取数据。
消费者可以选择何时提交偏移量。
共有 3 个发货箱:
最多一次:
- 收到消息后立即提交偏移量
- 如果某个过程失败,该消息将会丢失并且不会被再次读取。
至少一次:
- 消息处理后,偏移量将被提交
- 如果某个过程失败,则会再次读取该消息。
- 这可能会导致消息的重复处理,因此确保您的处理是幂等的并且不会影响您的系统非常重要。
恰好一次:
- 可以使用 kafka streams api 实现 kakfa 到 kafka 的通信。
- 对于外部系统的使用,我们需要使用幂等消费者。
压缩
还有一个非常重要的功能是压缩。
压缩在生产者级别启用,不需要在 Broker 或消费者级别进行任何配置更改。
默认情况下,压缩为无。对于入门级用户,snappy或lz4是不错的选择。
对于本地开发:
make local // runs docker-compose.local.yml
make crate_topics // create kafka topics
make mongo // load js init script to mongo docker container
make make_cert // generate local SLL certificates
make swagger // generate swagger documentation
UI 界面将在以下端口上可用:
Jaeger 用户界面:http://localhost:16686
Prometheus 用户界面:http://localhost:9090
Grafana 用户界面:http://localhost:3000
Kafka 用户界面:http://localhost:9000
Swagger UI 默认运行于:https://localhost:5007/swagger/index.html
在 Grafana 中,您需要选择 prometheus 作为指标源,然后创建仪表板。
带有云 UI 的良好 kafka docker 设置是融合的,但它具有巨大的图像大小,并且会将全球一半的互联网下载到您的本地电脑上。🤖 因此,在这里我使用kafdrop作为 UI 客户端
Docker-compose.local.yml:
version: "3.8"
services:
zookeeper:
container_name: zookeeper
restart: always
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
networks:
- products_network
kafka1:
container_name: kafka1
image: confluentinc/cp-kafka:5.3.0
restart: always
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
- products_network
kafka2:
container_name: kafka2
restart: always
image: confluentinc/cp-kafka:5.3.0
hostname: kafka2
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 2
volumes:
- ./data/kafka2/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
- products_network
kafka3:
container_name: kafka3
image: confluentinc/cp-kafka:5.3.0
restart: always
hostname: kafka3
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka3/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
- products_network
kafdrop:
container_name: kafdrop
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:19091"
depends_on:
- kafka1
- kafka2
- kafka3
networks:
- products_network
redis:
image: redis:6-alpine
container_name: user_redis
ports:
- "6379:6379"
restart: always
networks:
- products_network
prometheus:
container_name: prometheus_container
restart: always
image: prom/prometheus
volumes:
- ./monitoring/prometheus-local.yml:/etc/prometheus/prometheus.yml:Z
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--storage.tsdb.retention=20d'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
ports:
- '9090:9090'
networks:
- products_network
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks:
- products_network
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3000:3000'
networks:
- products_network
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- 5775:5775/udp
- 6831:6831/udp
- 6832:6832/udp
- 5778:5778
- 16686:16686
- 14268:14268
- 14250:14250
- 9411:9411
networks:
- products_network
mongodb:
image: mongo:latest
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: products
ports:
- 27017:27017
volumes:
- mongodb_data_container:/data/db
volumes:
mongodb_data_container:
networks:
products_network:
driver: bridge
生产环境中的 Go 客户端通常使用segmentsio和sarama,
两者都很好用,具体选择哪个取决于你。在本项目中,我使用了 segmentsio。
由于时间有限,我没有实现任何重要的业务逻辑,也没有进行测试。
我们的微服务可以通过 kafka、gRPC 和 REST 进行通信。
在 Makefile 中,你可以找到所有有用的命令。
例如,在 docker 中创建 kafka 主题:
docker exec -it kafka1 kafka-topics --zookeeper zookeeper:2181 --create --topic create-product --partitions 3 --replication-factor 2
对于 MongoDB,我们可以加载 javascript 文件,这个文件创建集合和索引:
mongo admin -u admin -p admin < init.js
Segmentio 库 API 提供了 reader 和 writer。
首先创建 reader:
func (pcg *ProductsConsumerGroup) getNewKafkaReader(kafkaURL []string, topic, groupID string) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: kafkaURL,
GroupID: groupID,
Topic: topic,
MinBytes: minBytes,
MaxBytes: maxBytes,
QueueCapacity: queueCapacity,
HeartbeatInterval: heartbeatInterval,
CommitInterval: commitInterval,
PartitionWatchInterval: partitionWatchInterval,
Logger: kafka.LoggerFunc(pcg.log.Debugf),
ErrorLogger: kafka.LoggerFunc(pcg.log.Errorf),
MaxAttempts: maxAttempts,
Dialer: &kafka.Dialer{
Timeout: dialTimeout,
},
})
}
和作家:
func (pcg *ProductsConsumerGroup) getNewKafkaWriter(topic string) *kafka.Writer {
w := &kafka.Writer{
Addr: kafka.TCP(pcg.Brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
RequiredAcks: writerRequiredAcks,
MaxAttempts: writerMaxAttempts,
Logger: kafka.LoggerFunc(pcg.log.Debugf),
ErrorLogger: kafka.LoggerFunc(pcg.log.Errorf),
Compression: compress.Snappy,
ReadTimeout: writerReadTimeout,
WriteTimeout: writerWriteTimeout,
}
return w
}
func (pcg *ProductsConsumerGroup) consumeCreateProduct(
ctx context.Context,
cancel context.CancelFunc,
groupID string,
topic string,
workersNum int,
) {
r := pcg.getNewKafkaReader(pcg.Brokers, topic, groupID)
defer cancel()
defer func() {
if err := r.Close(); err != nil {
pcg.log.Errorf("r.Close", err)
cancel()
}
}()
w := pcg.getNewKafkaWriter(deadLetterQueueTopic)
defer func() {
if err := w.Close(); err != nil {
pcg.log.Errorf("w.Close", err)
cancel()
}
}()
pcg.log.Infof("Starting consumer group: %v", r.Config().GroupID)
wg := &sync.WaitGroup{}
for i := 0; i <= workersNum; i++ {
wg.Add(1)
go pcg.createProductWorker(ctx, cancel, r, w, wg, i)
}
wg.Wait()
}
工作人员验证消息主体,然后调用 usecase,如果返回错误,则尝试重试,retry-go是一个不错的重试库。
如果再次失败,则将错误消息发布到非常简单的死信队列(Dead Letter Queue),正如我之前所说,这里没有实现任何有趣的业务逻辑,所以在实际生产中,我们必须以更好的方式处理错误情况。
消息处理成功后,再提交。
func (pcg *ProductsConsumerGroup) createProductWorker(
ctx context.Context,
cancel context.CancelFunc,
r *kafka.Reader,
w *kafka.Writer,
wg *sync.WaitGroup,
workerID int,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "ProductsConsumerGroup.createProductWorker")
defer span.Finish()
span.LogFields(log.String("ConsumerGroup", r.Config().GroupID))
defer wg.Done()
defer cancel()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
pcg.log.Errorf("FetchMessage", err)
return
}
pcg.log.Infof(
"WORKER: %v, message at topic/partition/offset %v/%v/%v: %s = %s\n",
workerID,
m.Topic,
m.Partition,
m.Offset,
string(m.Key),
string(m.Value),
)
incomingMessages.Inc()
var prod models.Product
if err := json.Unmarshal(m.Value, &prod); err != nil {
errorMessages.Inc()
pcg.log.Errorf("json.Unmarshal", err)
continue
}
if err := pcg.validate.StructCtx(ctx, prod); err != nil {
errorMessages.Inc()
pcg.log.Errorf("validate.StructCtx", err)
continue
}
if err := retry.Do(func() error {
created, err := pcg.productsUC.Create(ctx, &prod)
if err != nil {
return err
}
pcg.log.Infof("created product: %v", created)
return nil
},
retry.Attempts(retryAttempts),
retry.Delay(retryDelay),
retry.Context(ctx),
); err != nil {
errorMessages.Inc()
if err := pcg.publishErrorMessage(ctx, w, m, err); err != nil {
pcg.log.Errorf("publishErrorMessage", err)
continue
}
pcg.log.Errorf("productsUC.Create.publishErrorMessage", err)
continue
}
if err := r.CommitMessages(ctx, m); err != nil {
errorMessages.Inc()
pcg.log.Errorf("CommitMessages", err)
continue
}
successMessages.Inc()
}
}
在存储库层使用mongo-go-driver与数据库交互
// Create Create new product
func (p *productMongoRepo) Create(ctx context.Context, product *models.Product) (*models.Product, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productMongoRepo.Create")
defer span.Finish()
collection := p.mongoDB.Database(productsDB).Collection(productsCollection)
product.CreatedAt = time.Now().UTC()
product.UpdatedAt = time.Now().UTC()
result, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
if err != nil {
return nil, errors.Wrap(err, "InsertOne")
}
objectID, ok := result.InsertedID.(primitive.ObjectID)
if !ok {
return nil, errors.Wrap(productErrors.ErrObjectIDTypeConversion, "result.InsertedID")
}
product.ProductID = objectID
return product, nil
}
这是 gRPC 服务实现 os create handler,您可以在 github 存储库中找到完整代码:
// Create create new product
func (p *productService) Create(ctx context.Context, req *productsService.CreateReq) (*productsService.CreateRes, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
defer span.Finish()
createMessages.Inc()
catID, err := primitive.ObjectIDFromHex(req.GetCategoryID())
if err != nil {
errorMessages.Inc()
p.log.Errorf("primitive.ObjectIDFromHex: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
prod := &models.Product{
CategoryID: catID,
Name: req.GetName(),
Description: req.GetDescription(),
Price: req.GetPrice(),
ImageURL: &req.ImageURL,
Photos: req.GetPhotos(),
Quantity: req.GetQuantity(),
Rating: int(req.GetRating()),
}
created, err := p.productUC.Create(ctx, prod)
if err != nil {
errorMessages.Inc()
p.log.Errorf("productUC.Create: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
successMessages.Inc()
return &productsService.CreateRes{Product: created.ToProto()}, nil
}
以及使用echo的 REST API 处理程序:
// CreateProduct Create product
// @Tags Products
// @Summary Create new product
// @Description Create new single product
// @Accept json
// @Produce json
// @Success 201 {object} models.Product
// @Router /products [post]
func (p *productHandlers) CreateProduct() echo.HandlerFunc {
return func(c echo.Context) error {
span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "productHandlers.Create")
defer span.Finish()
createRequests.Inc()
var prod models.Product
if err := c.Bind(&prod); err != nil {
p.log.Errorf("c.Bind: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := p.validate.StructCtx(ctx, &prod); err != nil {
p.log.Errorf("validate.StructCtx: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := p.productUC.PublishCreate(ctx, &prod); err != nil {
p.log.Errorf("productUC.PublishCreate: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
successRequests.Inc()
return c.NoContent(http.StatusCreated)
}
}
在我们应用程序处理的顶层,记录 Prometheus 的错误和进程指标。您可以在这里
找到包含源代码和所有使用工具列表的存储库👨💻 :) 希望本文对您有所帮助,我很乐意收到任何反馈或问题 :)