使用 Micronaut、Kafka 和 Debezium 构建事件驱动的应用程序
简介
在本文中,我们将使用 Micronaut 框架在 Java 中构建一个事件驱动的应用程序。
简要介绍事件驱动架构
就本文的目的而言,可以理解为:
- 事件是组件状态变化的表示,例如:用户更新了他的姓名,新消息保存在数据库中,与“用户发送了一条新消息”相同,与“用户 A 向用户 B 发送了一条新消息”相同,等等。
- 事件驱动架构是一种基于对事件的反应的架构。
在这种架构中,我们有三个关键参与者:
- 生产者:创建事件,或者换句话说,发送消息通知某事发生的人;这些消息可能包含与处理事件相关的信息的有效负载;
- 经纪人:将消息从生产者传递给消费者
- 消费者:处理(响应)消息以执行特定任务的人。
请注意,单个服务/应用程序可能有不同类型的参与者,例如:使用新消息事件并产生未读消息通知事件的聊天后端应用程序。
这些架构的性质使得它们非常适合用于异步处理和/或高负载的应用程序。
有许多资源涉及事件驱动架构的各个细节和方面,包括用例、优点、缺点等。我会在文章中找到它们的链接。
真实示例应用程序
我当时正试图构思一个事件驱动架构的“真实示例应用”,它涵盖了此类架构的基本方面,并且足够简单易行。这时,我想起了《设计数据密集型应用程序》中用来描述系统“负载”概念的一个例子:Twitter 上推文的“扇出”。
我把它看作是我想要的例子。也许没有我想要的那么简单,但没关系……我想,以后再告诉我吧,哈哈哈。
那么,“扇出”到底是什么呢?Twitter 主要有两个功能:
- 发布一条推文:用户向其关注者发布一条新消息(平均每秒 4.6k 个请求,峰值超过每秒 12k 个请求)
- 主页时间线:用户查看其关注的用户最近发布的推文(每秒 30 万次请求)
Twitter 的扩展问题不是由于推文,因为它可以处理 12k 个写入请求,而是由于将其推文“扇出”到用户时间线,因为每个用户关注许多人,并且每个用户都被许多人关注。
扇出(Fan-out)一词源自电子工程,指的是连接到另一个逻辑门输出的逻辑门输入的数量。输出需要提供足够的电流来驱动所有连接的输入。在事务处理系统中,我们用它来描述为了处理一个传入请求,我们需要向其他服务发出多少个请求。
考虑以下关系模式:
实现这些操作最简单的方法可能是
- 发布推文:只需将新推文插入全局推文表即可。
- 主页时间线:查找用户关注的所有用户,并查看他们最近的推文。例如:
SELECT tweets.*,
users.*
FROM tweets
JOIN users ON tweets.sender_id = users.id
JOIN follows ON follows.followee_id = users.id
WHERE follows.follower_id = CURRENT_USER
ORDER BY tweets.timestamp
LIMIT TIMELINE_SIZE;
虽然简单,但考虑到扇出问题,这种方法并不像我们希望的那么快。
另一种方法(顺便说一下,这是Twitter 在努力跟上时间线加载后转而采用的方法)是为每个用户保留其主页时间线的缓存:
- 发布推文:查找关注推文作者的所有人,并将新推文插入他们的主页时间线缓存中。
- 主页时间线:仅从缓存中读取。
第二种方法是有效的,因为我们的时间线阅读量几乎是推文帖子的两倍,所以在发布时做额外的工作是可以的。
我们将要构建的架构
我们将构建一个实现第二种方法的后端系统。
该系统应该允许用户 (1) 发布推文和 (2) 检索他们的主页时间线。为简单起见,我们不处理用户创建以及身份验证/授权,而是假设由其他组件/服务处理,以便我们可以专注于 (1) 和 (2)。
下图说明了我们系统的组件以及数据在它们之间的流动方式:
- Tweets API 允许我们的用户发布/编辑/删除推文
- 我们的数据库将是一个 PostgreSQL 实例
- 我们将使用 Debezium 捕获变更数据,并使用 Kafka Connect 将推文事件发布到 Kafka,我们将使用 Kafka 作为代理。
- 我们将使用“发件箱模式”来实现
- 推文传递工作者,当 Kafka 消费新的推文事件时,它将更新用户主页时间线缓存
- 时间线缓存可能只是内存中的缓存,例如 Redis 实例,时间线请求会发送到 Tweets API,然后,Tweets API 会读取缓存以检索时间线推文数据并将其返回给用户
完整项目
您可以在此处找到完整的代码。
执行
我们将使用 Micronaut 框架。
设置
依赖项
为了在本地设置我们的外部依赖项(如 Postgres、Debezium、Kafka 和 Redis),我们将使用 Docker Compose。
version: '2.2'
services:
postgres:
container_name: postgres
image: 'postgres:11.2-alpine'
ports:
- 5432:5432
environment:
- POSTGRES_DB=tweets
- POSTGRES_PASSWORD=pass
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 3s
retries: 7
redis:
container_name: redis
image: 'redis:6.0-alpine'
hostname: redis
ports:
- '6379:6379'
zookeeper:
container_name: zookeeper
image: 'confluentinc/cp-zookeeper:4.0.3'
ports:
- "2181:2181"
environment:
- ZOOKEEPER_CLIENT_PORT=2181
kafka:
container_name: kafka
image: 'confluentinc/cp-kafka:4.0.3'
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
schema-registry:
container_name: schema-registry
image: 'confluentinc/cp-schema-registry:4.0.3'
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
ports:
- "8081:8081"
wait-for-dependencies:
image: dadarek/wait-for-dependencies
container_name: wait-for-dependencies
scale: 0
command: redis:6379 schema-registry:8081 kafka:9092 zookeeper:2181 postgres:5432
数据库设置
我喜欢用https://dbdiagram.io创建我的数据库架构图。它快捷简单,并且提供将架构导出到 SQL 的选项。🆒
该模式极大地简化了“真实世界应用程序”的使用,但却包含了我们实现系统和举例说明一些概念所需要的内容。
我们有用户表,以及关注关系表,顾名思义,它记录用户关注的用户,以及全局推文表(这里的另一个暗示性名称:P)。
迁移
对数据库的任何更改都称为迁移。我们将这些更改保存到版本控制文件中,以便充分利用版本控制带来的所有好处:跟踪数据库模式随时间的变化,使我们能够从头开始重建数据库,清晰地了解数据库的当前状态,并在需要时更轻松地撤消操作(回滚)等等。
这些概念借鉴自Flyway(我们将使用该工具来管理迁移),您可以在此处阅读有关它们的更多信息,其中包含图示示例等。
这是从 dbdiagram 导出的 SQL 文件,我们将使用它作为V1__Init.sql
迁移:
CREATE TABLE "users" (
"id" BIGSERIAL UNIQUE,
"username" varchar UNIQUE,
PRIMARY KEY ("id", "username")
);
CREATE TABLE "follows" (
"follower_id" bigint,
"followee_id" bigint,
PRIMARY KEY ("follower_id", "followee_id")
);
CREATE TABLE "tweets" (
"id" SERIAL PRIMARY KEY,
"sender_id" bigint,
"text" text,
"timestamp" timestamp
);
ALTER TABLE "follows" ADD FOREIGN KEY ("follower_id") REFERENCES "users" ("id");
ALTER TABLE "follows" ADD FOREIGN KEY ("followee_id") REFERENCES "users" ("id");
ALTER TABLE "tweets" ADD FOREIGN KEY ("sender_id") REFERENCES "users" ("id");
Flyway 设置
我们可以选择在应用容器内运行 Flyway,但我们不会这么做。相反,我们会在 Flyway 自己的容器中运行它。这样做是为了避免并发问题:假设我们有几十个应用实例启动,每个实例都运行 Flyway 来验证和更新迁移,想象一下如果迁移过程中出现问题会造成多么混乱。
这样做很简单,我们将使用Flyway Docker 镜像来完成。
将迁移文件放置在postgres/migrations
并将迁移容器设置添加到docker-compose.yml
:
migration:
container_name: migration
image: 'flyway/flyway:6.0.2-alpine'
command: -url=jdbc:postgresql://postgres:5432/tweets -user=postgres -password=pass migrate
volumes:
- ./postgres/migrations:/flyway/sql:Z
depends_on:
postgres:
condition: service_healthy
然后我们可以连接到本地 postgres 实例并检查表是否已创建:
$ psql -h localhost -d tweets -U postgres
Password for user postgres:
psql (12.6, server 11.2)
Type "help" for help.
tweets=# \dt
List of relations
Schema | Name | Type | Owner
--------+-----------------------+-------+----------
public | flyway_schema_history | table | postgres
public | follows | table | postgres
public | tweets | table | postgres
public | users | table | postgres
(4 rows)
创建推文
Micronaut 指南非常详细,在我开发这个示例 API 时提供了很大的帮助。它包含大量示例,很好地涵盖了该框架的日常使用。
数据访问层 (DAL)
这一层将负责创建一个抽象,用于操作数据库或其他持久化机制中的数据。我们将通过创建一些组件来实现这一点:
- 模型:为被操作的实体及其关系创建抽象。我们将使用 Hibernate 来创建这些抽象。
- 数据传输对象(DTO):封装将在层之间传输的数据的类(我知道这不是Martin Fowler 的定义......)
- 数据访问对象 (DAO):这些是持久化机制的抽象。这将公开数据操作,而无需暴露数据库细节。因此,我们能够轻松地更新和检索数据。我们将使用 JPA 来提供这些操作。
模型
在这里,我们使用 Lombok 和 Java Persistence 注释来创建我们的 Tweet 模型,它具有默认构造函数、getter、setter 以及指定数据库模式、表及其字段。
由于该字段是自动生成的,因此我们还使用了@EqualsAndHashCode
with@EqualsAndHashCode.Exclude
来id
使我们的测试在比较模型时更容易。id
package com.georgeoliveira.tweets.common.tweets.models;
import java.sql.Timestamp;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.FieldDefaults;
@Entity
@Table(schema = "public", name = "tweets")
@Getter
@Setter
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
@EqualsAndHashCode
public class Tweet {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@EqualsAndHashCode.Exclude
Long id;
Long senderId;
String text;
Timestamp timestamp;
}
关于 Hibernate 的最佳资源就是这个博客。
DTO(数据传输对象)
我们将使用 Lombok 构建器来创建我们的 Tweet DTO:
package com.georgeoliveira.tweets.common.tweets.dtos;
import java.time.LocalDateTime;
import lombok.Builder;
import lombok.Value;
@Value
@Builder(toBuilder = true)
public class TweetDto {
Long id;
Long senderId;
String text;
LocalDateTime timestamp;
}
映射器
映射器是在类之间进行转换的组件:
package com.georgeoliveira.tweets.common.tweets.mappers;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.models.Tweet;
import java.sql.Timestamp;
import java.time.ZoneOffset;
public class TweetMapper {
public static TweetDto fromModel(Tweet tweet) {
return TweetDto.builder()
.id(tweet.getId())
.senderId(tweet.getSenderId())
.text(tweet.getText())
.timestamp(tweet.getTimestamp().toInstant().atOffset(ZoneOffset.UTC).toLocalDateTime())
.build();
}
public static Tweet fromDto(TweetDto tweetDto) {
Tweet tweet = new Tweet();
tweet.setId(tweetDto.getId());
tweet.setSenderId(tweetDto.getSenderId());
tweet.setText(tweetDto.getText());
tweet.setTimestamp(Timestamp.from(tweetDto.getTimestamp().toInstant(ZoneOffset.UTC)));
return tweet;
}
}
DAO(数据访问对象)
在这里我们将使用 JPA 注释并扩展JpaRepository
我们的模型的接口:
package com.georgeoliveira.tweets.common.tweets.dal.dao;
import com.georgeoliveira.tweets.common.tweets.models.Tweet;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;
@Repository
public interface TweetsDao extends JpaRepository<Tweet, Long> {}
为了添加更多的数据库访问抽象,我们更改了这个 DAO 文件,例如:假设我们要添加一种方法来检索特定发件人的所有推文,我们可以添加一个方法签名:
List<Tweet> findAllBySenderId(Long senderId);
有关 Micronaut Data 可用可能性的更多详细信息请参见此处
达拉斯
在我们的 DAL 抽象中,我们提供了一种将推文保存到数据库中的方法。
package com.georgeoliveira.tweets.common.tweets.dal;
import com.georgeoliveira.tweets.common.tweets.dal.dao.TweetsDao;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.mappers.TweetMapper;
import com.georgeoliveira.tweets.common.tweets.models.Tweet;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.transaction.Transactional;
@Singleton
public class TweetsDal {
@Inject TweetsDao tweetsDao;
@Transactional
public TweetDto persistTweet(TweetDto tweetDto) {
Tweet tweet = TweetMapper.fromDto(tweetDto);
Tweet persistedTweet = tweetsDao.save(tweet);
return TweetMapper.fromModel(persistedTweet);
}
}
事务确保我们的方法执行是原子的。经典的例子是:
UserTransaction utx = entityManager.getTransaction();
try {
utx.begin();
businessLogic();
utx.commit();
} catch(Exception ex) {
utx.rollback();
throw ex;
}
该begin()
调用启动一个事务,此后所有操作都被视为原子操作。当commit()
发生时,信息将被持久化,事务结束。如果 内部发生任何错误businessLogic()
,catc()
则会触发流程并进行回滚,确保没有任何内容被持久化,因此事务也结束。
我们使用它是为了从为我们处理交易管理的Micronaut@Transactional
中受益。HibernateTransactionManager
API
服务
我们的 Tweets 服务实例使用 DAL 来持久化使用来自 POST 请求主体的数据构建的推文:
package com.georgeoliveira.tweets.api.services;
import com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto;
import com.georgeoliveira.tweets.api.mappers.TweetRequestMapper;
import com.georgeoliveira.tweets.common.tweets.dal.TweetsDal;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TweetsService {
@Inject TweetsDal tweetsDal;
public TweetDto publishTweet(PostTweetRequestDto request) {
TweetDto tweetDto = TweetRequestMapper.fromPostRequest(request);
TweetDto persistedTweetDto = tweetsDal.persistTweet(tweetDto);
return persistedTweetDto;
}
}
该机构的格式如下:
package com.georgeoliveira.tweets.api.dtos;
import io.micronaut.core.annotation.Introspected;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import lombok.Builder;
import lombok.Value;
@Value
@Builder(toBuilder = true)
@Introspected
public class PostTweetRequestDto {
@NotNull @NotBlank Long senderId;
@NotNull @NotBlank String text;
@NotNull @Positive Long timestamp;
}
再次,我们使用 Lombok 为此类创建一个构建器,并为其字段创建一些验证约束。
它的映射器是:
package com.georgeoliveira.tweets.api.mappers;
import com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
public class TweetRequestMapper {
public static TweetDto fromPostRequest(PostTweetRequestDto postTweetRequestDto) {
return TweetDto.builder()
.id(null)
.senderId(postTweetRequestDto.getSenderId())
.text(postTweetRequestDto.getText())
.timestamp(
LocalDateTime.from(
Instant.ofEpochMilli(postTweetRequestDto.getTimestamp()).atZone(ZoneOffset.UTC)))
.build();
}
}
控制器
我们使用该@Controller
注解创建带有 POST/tweets
路由的 Tweets 控制器。我们还使用该@Validated
注解来确保对字段的约束得到应用。
package com.georgeoliveira.tweets.api.controllers;
import com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto;
import com.georgeoliveira.tweets.api.services.TweetsService;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.micronaut.validation.Validated;
import java.util.Objects;
import javax.inject.Inject;
import javax.validation.Valid;
@Validated
@Controller("/tweets")
public class TweetsController {
@Inject TweetsService tweetsService;
@Post
public HttpResponse<Long> postTweet(@Valid @Body PostTweetRequestDto request) {
TweetDto tweet = tweetsService.publishTweet(request);
if (Objects.nonNull(tweet)) {
return HttpResponse.created(tweet.getId());
}
return HttpResponse.notFound();
}
}
该控制器使用我们的服务来发布推文。
我们可以通过运行以下命令来构建项目:
$ ./gradlew build -x test
然后,通过执行以下操作来运行它:
$ java -jar build/libs/tweets-all.jar
最后,通过执行以下操作来调用 API:
$ curl -L -X POST 'http://localhost:8080/tweets' -H 'Content-Type: application/json' --data-raw '{
"sender_id": 12356,
"text": "xalala",
"timestamp": 1619626150979
}'
请注意,由于 Tweets 表引用了 Users 表,因此senderId
必须是有效用户,因此,首先我们必须手动填充 Users 表:
$ psql -h localhost -d tweets -U postgres
Password for user postgres:
psql (12.6, server 11.2)
Type "help" for help.
tweets=# INSERT INTO users(username) VALUES ('my_username');
INSERT 0 1
tweets=# SELECT id, username FROM users WHERE username = 'my_username';
id | username
----+-------------
1 | my_username
(1 row)
发布推文事件
现在我们能够处理发布推文的请求,那么我们如何触发推文事件并发布到 Kafka 呢?
协议模式
我们将定义一个通用的“事件”模式,Debezium 将使用该模式将触发系统数据流的事件发布到 Kafka。此模式将使用 Apache Avro 定义:
protocols/avro/events/key.avro
{
"type": "record",
"name": "Key",
"namespace": "com.georgeoliveira.events",
"fields": [
{
"name": "aggregate_id",
"type": [
"null",
"string"
],
"default": null
}
]
}
protocols/avro/events/value.avro
{
"type": "record",
"name": "Value",
"namespace": "com.georgeoliveira.events",
"fields": [
{
"name": "event_id",
"doc": "A valid V4 UUID. Each event has a unique id.",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "aggregate_id",
"doc": "The id of the whole aggregate that had any of the nested entities or the root entity edited.",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "type",
"doc": "The type of the event, eg. \"create\" or \"update\".",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "aggregate",
"doc": "The whole aggregate that had any of the nested entities or the root entity edited.",
"type": [
"null",
"string"
],
"default": null
}
]
}
https://avro.apache.org/docs/current/spec.html
字段注释为您解释了每个字段的实用性和重要性,但让我们扩展对aggregate
和type
字段的讨论。
它将aggregate
包含处理事件所需的相关数据的有效负载。例如,如果我们正在开发一个聊天应用,事件是“发送新消息”,那么aggregate
JSON 格式可能是这样的:
{
"sender_id": ...,
"recipient_id": ...,
"message": ....
}
我们对推文事件的聚合将具有推文有效负载:
{
"id": ...,
"sender_id": ...,
"text": ...,
"timestamp": ...
}
我们使用该type
字段来存储发生的事件的类型,以便不同的消费者可以决定如何处理它,例如,该post_tweet
事件可能触发交付处理,但edit_tweet
可能不会。
为了为聚合有效负载创建模式,我们定义了一个协议缓冲区消息类型来表示一条推文:
protocols/proto/tweets/tweet.proto
syntax = "proto3";
package com.georgeoliveira.tweets.proto;
option java_outer_classname = "TweetProtobuf";
message Tweet {
int64 id = 1;
int64 sender_id = 2;
string text = 3;
int64 timestamp = 4;
}
因此,每当发布事件时,我们都会确保该aggregate
字段具有消费者期望的模式。此外,这些模式可以进行版本控制,以便我们跟踪其演变。
协议文件夹可以是作为子模块包含在我们的系统中的git存储库,这样它的开发就可以独立于应用程序继续进行,其他相关组件也可以包含它们。
发件箱模式
我们已经定义了事件模式、protobuffers 等等,但现在你肯定在想“我们到底该怎么发布事件??”。好吧,这就是发件箱模式的用武之地。
在这个模式中,对于系统中每个实体,如果其更改会触发任何行为,我们都会为其创建一个数据库表,称为“发件箱表”。在我们的示例中,我们希望每发布一条新推文,都会触发传递流程,因此我们需要为 tweets 表中的实体“Tweet”创建一个“推文发件箱表”。
该表必须是通用的,以便它具有与Value
我们之前定义的 avro 模式相同的模式,如我们的第二次迁移所示V2__AddTweetsOutboxTable.sql
:
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE tweets_outbox (
event_id UUID DEFAULT uuid_generate_v4(),
aggregate_id TEXT,
type TEXT,
aggregate JSONB,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW()
);
除了发件箱之外,我们还需要一个变更数据捕获 (CDC) 应用来监控发件箱表的变更,并将新条目以消息的形式发布到代理。这样,每当我们想要发布事件时,只需将其插入到发件箱表中即可。
我们将使用的 CDC 应用是 Debezium。我们将使用 Debezium PostgreSQL 连接器作为 Kafka 连接器,以便将消息发布到 Kafka。为此,我们需要:
- 配置我们的 PostgreSQL 实例以允许来自 Debezium 的连接。
- 使用 Debezium PostgreSQL 连接器插件设置 Kafka 连接器
PostgreSQL 设置
我们要做的第一件事就是在我们的 PG 实例上启用逻辑解码:
逻辑解码是将数据库表中的所有持久更改提取为连贯、易于理解的格式的过程,无需详细了解数据库的内部状态即可对其进行解释。
我们通过安装名为wal2json的插件来实现这一点,具体操作请遵循其 README 的说明,其中描述了如何启用逻辑复制。
了解这些步骤非常重要,以防您需要在工作或其他情况下在托管或 VM 实例中执行这些步骤。
出于本文的目的,我们将使用 Debezium 提供的 Postgres Docker 镜像,该镜像已附带这些安装和配置:https://github.com/debezium/docker-images/tree/master/postgres/12-alpine。
这个镜像仓库也很好的总结了所有需要的配置。
https://www.postgresql.org/docs/12/logicaldecoding-explanation.html
Kafka Connect 设置
本文解释了手动设置和运行带有 Debezium 插件的 Kafka Connect 的步骤,本文无法完整地概括这些步骤。
出于本文的目的,我们将使用 Debezium Connect Docker 镜像https://github.com/debezium/docker-images/tree/master/connect/1.5:
connect:
image: 'debezium/connect:1.5'
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=connect_configs
- OFFSET_STORAGE_TOPIC=connect_offsets
- STATUS_STORAGE_TOPIC=connect_statuses
depends_on:
- kafka
- postgres
欲了解更多详情,请参阅本文。
创建 Debezium 连接器
接下来,我们需要设置一个连接器来监听推文表的变更。为此,我们将使用 Debezium 的 REST API:
curl -d @"connector-config.json" \
-H "Content-Type: application/json" \
-X POST http://connect:8083/connectors
其中connector-config.json
:
{
"name": "tweets_outbox_connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"slot.name": "tweets_outbox_connector",
"transforms": "unwrap,ValueToKey,SetKeySchema,SetValueSchema",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "aggregate_id",
"transforms.SetValueSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetValueSchema.schema.name": "com.georgeoliveira.events.Value",
"transforms.SetKeySchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
"transforms.SetKeySchema.schema.name": "com.georgeoliveira.events.Key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081/",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081/",
"plugin.name": "wal2json_rds",
"database.server.name": "postgres",
"database.dbname": "tweets",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "pass",
"schema.include.list": "public",
"table.include.list": "public.tweets_outbox"
}
}
我们可以通过运行以下命令来查看连接器状态
$ curl http://connect:8083/connectors/tweets_outbox_connector/status
需要注意的是,如果 Debezium 连接的 Postgres 或 Kafka 出现问题,连接器将停止工作,并且不会自行恢复到“正在运行”状态。我们需要手动重新创建它。
要删除连接器,除了调用 Debezium API 中的 DELETE 方法外,还可以执行以下操作:
curl -X DELETE http://debezium.debezium/connectors/tweets_outbox_connector -v
我们需要删除 postgres 复制槽:
select pg_drop_replication_slot('tweets_outbox_connector');
为了测试发件箱事件是否正在发布,让我们在发件箱表中插入一些数据并尝试从 kafka 中使用:
$ psql -h localhost -d tweets -U postgres
Password for user postgres:
psql (12.6)
Type "help" for help.
tweets=# insert into tweets_outbox(aggregate_id, type, aggregate) values ('1', 'test', '{"test":"this is a test"}');
INSERT 0 1
tweets=#
使用kafkacat
Kafka 主题进行消费postgres.public.tweets_outbox
,我们得到如下结果:
$ kafkacat -C -b kafka:9092 -t postgres.public.tweets_outbox
H71dbed94-c3db-410e-b7a0-7b0a120e8617test4{"test": "this is a test"}����߹�
% Reached end of topic postgres.public.tweets_outbox [0] at offset 1
发布推文事件
每当发布一条推文时,我们都会通过在发件箱表中插入相关数据来发布此事件。
我们也可以使用数据库触发器来发布此事件。在我们这个非常简单的推文示例中,触发器运行良好,但对于更复杂的模式及其关系,事情会变得更加复杂,设置这些触发器规则可能会很麻烦。因此,更简单的方法是,只要我们想插入数据,就可以使用它自己的 DAL 层。
这个 DAL 结构与我们为推文定义的结构非常相似。
推文发件箱模型
package com.georgeoliveira.tweets.common.tweets.outbox.models;
import com.vladmihalcea.hibernate.type.json.JsonBinaryType;
import java.sql.Timestamp;
import java.util.UUID;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.FieldDefaults;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;
import org.hibernate.annotations.TypeDefs;
@Entity
@Table(schema = "public", name = "tweets_outbox")
@Getter
@Setter
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
@EqualsAndHashCode
@TypeDefs({@TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)})
public class TweetOutbox {
@Id
@Column(name = "event_id")
@GeneratedValue(strategy = GenerationType.AUTO)
@EqualsAndHashCode.Exclude
UUID eventId;
@Column(name = "aggregate_id")
String aggregateId;
@Column(name = "type")
String type;
@Type(type = "jsonb")
@Column(name = "aggregate", columnDefinition = "jsonb")
String aggregate;
@Column(name = "created_at", insertable = false)
@EqualsAndHashCode.Exclude
Timestamp createdAt;
}
映射器:
package com.georgeoliveira.tweets.common.tweets.outbox.mappers;
import com.georgeoliveira.campaigns.proto.TweetProtobuf;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.outbox.dtos.EventType;
import com.georgeoliveira.tweets.common.tweets.outbox.models.TweetOutbox;
import com.googlecode.protobuf.format.JsonFormat;
import java.time.ZoneOffset;
public class TweetOutboxMapper {
public static TweetOutbox toOutboxModel(TweetDto tweetDto, EventType eventType) {
TweetProtobuf.Tweet tweetProto = toProto(tweetDto);
String aggregate = toAggregate(tweetProto);
TweetOutbox tweetOutbox = new TweetOutbox();
tweetOutbox.setType(eventType.toString());
tweetOutbox.setAggregate(aggregate);
tweetOutbox.setAggregateId(String.valueOf(tweetDto.getId()));
return tweetOutbox;
}
private static TweetProtobuf.Tweet toProto(TweetDto tweetDto) {
return TweetProtobuf.Tweet.newBuilder()
.setId(tweetDto.getId())
.setSenderId(tweetDto.getSenderId())
.setText(tweetDto.getText())
.setTimestamp(tweetDto.getTimestamp().toInstant(ZoneOffset.UTC).toEpochMilli())
.build();
}
private static String toAggregate(TweetProtobuf.Tweet tweetProto) {
JsonFormat jsonFormat = new JsonFormat();
return jsonFormat.printToString(tweetProto);
}
}
由于我们使用了之前定义的 Protobuffer,因此新增了一些步骤。首先,我们将推文的 DTO 映射到 Protobuffer,然后将该 Protobuffer 映射到 JSON 字符串,最后将其设置为aggregate
发件箱的字段。
推文发件箱 DAO
package com.georgeoliveira.tweets.common.tweets.outbox.dal.dao;
import com.georgeoliveira.tweets.common.tweets.outbox.models.TweetOutbox;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;
import java.util.UUID;
@Repository
public interface TweetsOutboxDao extends JpaRepository<TweetOutbox, UUID> {}
推文发件箱 DAL
package com.georgeoliveira.tweets.common.tweets.outbox.dal;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.outbox.dal.dao.TweetsOutboxDao;
import com.georgeoliveira.tweets.common.tweets.outbox.dtos.EventType;
import com.georgeoliveira.tweets.common.tweets.outbox.mappers.TweetOutboxMapper;
import com.georgeoliveira.tweets.common.tweets.outbox.models.TweetOutbox;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TweetsOutboxDal {
@Inject TweetsOutboxDao tweetsOutboxDao;
public void sendToOutbox(TweetDto tweetDto, EventType eventType) {
TweetOutbox tweetOutbox = TweetOutboxMapper.toOutboxModel(tweetDto, eventType);
tweetsOutboxDao.save(tweetOutbox);
}
}
服务
我们需要修改我们的推文服务,以便每当发布推文时将推文事件发送到发件箱:
package com.georgeoliveira.tweets.api.services;
import com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto;
import com.georgeoliveira.tweets.api.mappers.TweetRequestMapper;
import com.georgeoliveira.tweets.common.tweets.dal.TweetsDal;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.outbox.dal.TweetsOutboxDal;
import com.georgeoliveira.tweets.common.tweets.outbox.dtos.EventType;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TweetsService {
@Inject TweetsDal tweetsDal;
@Inject
TweetsOutboxDal tweetsOutboxDal;
public TweetDto publishTweet(PostTweetRequestDto request) {
TweetDto tweetDto = TweetRequestMapper.fromPostRequest(request);
TweetDto persistedTweetDto = tweetsDal.persistTweet(tweetDto);
tweetsOutboxDal.sendToOutbox(persistedTweetDto, EventType.PUBLISH_TWEET);
return persistedTweetDto;
}
}
处理推文事件
现在我们能够发布推文并让它触发推文事件发布到 kafka,我们将实现架构的事件驱动部分,即接收这些事件然后执行操作的部分。
在我们的例子中,我们将:
- 使用来自 Kafka 的推文事件
- 将这些事件添加到作者关注者的时间线
为此,我们需要:
- 一个用于推特事件的 Kafka 消费者
- 时间线的抽象
- 持久化时间线的方法
我们可以使用我们的 postgres 实例来保存时间线,但我们对某种缓存感兴趣,从中我们可以快速检索这些数据,这似乎非常适合Redis。
我们将分两部分实现这一点:
- 消费者有两个组成部分:
- 监听器:连接到 Kafka 并接收来自主题的事件
- 处理器:将处理侦听器接收到的事件
- 时间线的 DAL,它将是我们 Redis 实例的抽象。此 DAL 将包含我们已为该项目创建的其他 DAL 中的相同组件。
时间线 DAL
该组件是 Redis 的抽象,我们将使用它来保存用户的时间线。
协议模式
为了做到这一点,我们需要定义一个模式来确定如何将数据存储到 Redis 中。
对于密钥,我们只需使用user_id
。
对于值部分,我们将使用 protobuffer 的字节数组表示:
syntax = "proto3";
package com.georgeoliveira.tweets.proto;
option java_outer_classname = "TimelineProtobuf";
import "tweets/tweet.proto";
message Timeline {
int64 user_id = 1;
repeated com.georgeoliveira.tweets.proto.Tweet tweets = 2;
}
数据对象
我们将使用 Lombok 构建器来创建时间线 DTO:
package com.georgeoliveira.tweets.common.timelines.dtos;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import java.util.List;
import lombok.Builder;
import lombok.Value;
@Builder(toBuilder = true)
@Value
public class TimelineDto {
Long userId;
List<TweetDto> tweetsList;
}
映射器
这是我们的映射器,它TimelineDto
从列表创建TweetDto
并映射TimelineDto
到字节数组。
package com.georgeoliveira.tweets.common.timelines.mappers;
import com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.mappers.TweetMapper;
import com.georgeoliveira.tweets.proto.TimelineProtobuf;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
public class TimelineMapper {
public static TimelineDto fromList(Long userId, List<TweetDto> tweetsList) {
return TimelineDto.builder().userId(userId).tweetsList(tweetsList).build();
}
public static Pair<Long, byte[]> toUserIdTimelineByteArrayPair(TimelineDto timelineDto) {
return Pair.of(timelineDto.getUserId(), toTimelineByteArray(timelineDto));
}
private static byte[] toTimelineByteArray(TimelineDto timelineDto) {
return toProto(timelineDto).toByteArray();
}
private static TimelineProtobuf.Timeline toProto(TimelineDto timelineDto) {
return TimelineProtobuf.Timeline.newBuilder()
.setUserId(timelineDto.getUserId())
.addAllTweets(
timelineDto
.getTweetsList()
.stream()
.map(tweetDto -> TweetMapper.toProto(tweetDto))
.collect(Collectors.toList()))
.build();
}
}
去中心化自治组织
要访问 Redis,我们将使用Lettuce Client。
首先,我们将定义一个[TimelineCommands
接口](https://lettuce.io/core/release/reference/index.html#redis-command-interfaces),它定义了我们将用来与 Redis 交互的方法:
package com.georgeoliveira.tweets.common.timelines.dal.dao;
import io.lettuce.core.dynamic.Commands;
import io.lettuce.core.dynamic.annotation.Command;
public interface TimelineCommands extends Commands {
@Command("SET")
void set(String userId, byte[] timelineByteArray);
@Command("GET")
byte[] get(String userId);
}
然后,我们将定义一个工厂,该工厂将使用连接到我们的 Redis 的接口实例来设置单例:
package com.georgeoliveira.tweets.common.timelines.dal.dao.factories;
import com.georgeoliveira.tweets.common.timelines.dal.dao.TimelineCommands;
import io.lettuce.core.RedisClient;
import io.lettuce.core.dynamic.RedisCommandFactory;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Value;
import javax.inject.Singleton;
@Factory
public class TimelineCommandsFactory {
@Value("${redis.host}")
private String REDIS_HOST;
@Singleton
TimelineCommands timelineCommands() {
RedisClient redisClient = RedisClient.create(REDIS_HOST);
RedisCommandFactory commandFactory = new RedisCommandFactory(redisClient.connect());
return commandFactory.getCommands(TimelineCommands.class);
}
}
达拉斯
最后,我们包装其他组件的 DAL 是:
package com.georgeoliveira.tweets.common.timelines.dal;
import com.georgeoliveira.tweets.common.timelines.dal.dao.TimelineCommands;
import com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto;
import com.georgeoliveira.tweets.common.timelines.mappers.TimelineMapper;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
@Singleton
public class TimelinesDal {
@Inject TimelineCommands timelineCommandsDao;
public void persistTimeline(TimelineDto timelineDto) {
Pair<Long, byte[]> userIdTimelinePair =
TimelineMapper.toUserIdTimelineByteArrayPair(timelineDto);
timelineCommandsDao.set(
String.valueOf(userIdTimelinePair.getLeft()), userIdTimelinePair.getRight());
}
}
消费推文事件
处理器
处理器是实现本文开头描述的扇出方法流程的组件:
发布推文:查找关注推文作者的所有人,并将新推文插入他们的主页时间线缓存中。
让我们将此流程分解为:
- 查找关注推文作者的所有人
- 在他们的时间线上插入新推文
对于第一部分,我们需要对 User 表进行抽象,以便它可以检索用户关注者:
这是我们的User
模型:
package com.georgeoliveira.tweets.common.users.models;
import java.util.ArrayList;
import java.util.List;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
import javax.persistence.OneToMany;
import javax.persistence.Table;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.FieldDefaults;
@Entity
@Table(schema = "public", name = "users")
@Getter
@Setter
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
@EqualsAndHashCode
public class User {
@Id @EqualsAndHashCode.Exclude Long id;
String username;
@OneToMany(fetch = FetchType.EAGER)
@JoinTable(
name = "follows",
joinColumns = {@JoinColumn(name = "followee_id")},
inverseJoinColumns = {@JoinColumn(name = "follower_id")})
List<User> followers = new ArrayList<>();
}
请注意followers
映射follows
关联表的字段,该字段允许我们检索用户的关注者。另请注意,我们只映射了对我们正在构建的这个特定流程有用的内容,而许多用于建模“真实用户”的特征则被遗漏了。
这是我们为用户提供的 DTO:
package com.georgeoliveira.tweets.common.users.dtos;
import java.util.List;
import lombok.Builder;
import lombok.Value;
@Value
@Builder(toBuilder = true)
public class UserDto {
Long id;
String username;
List<UserDto> followers;
}
映射器:
package com.georgeoliveira.tweets.common.users.mappers;
import com.georgeoliveira.tweets.common.users.dtos.UserDto;
import com.georgeoliveira.tweets.common.users.models.User;
import java.util.stream.Collectors;
public class UserMapper {
public static UserDto fromModel(User user) {
return UserDto.builder()
.id(user.getId())
.username(user.getUsername())
.followers(
user.getFollowers().stream().map(UserMapper::fromModel).collect(Collectors.toList()))
.build();
}
}
DAO 也是如此:
package com.georgeoliveira.tweets.common.users.dal.dao;
import com.georgeoliveira.tweets.common.users.models.User;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;
@Repository
public interface UsersDao extends JpaRepository<User, Long> {}
最后,DAL:
package com.georgeoliveira.tweets.common.users.dal;
import com.georgeoliveira.tweets.common.users.dal.dao.UsersDao;
import com.georgeoliveira.tweets.common.users.dtos.UserDto;
import com.georgeoliveira.tweets.common.users.mappers.UserMapper;
import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class UsersDal {
@Inject UsersDao usersDao;
public List<UserDto> getUserFollowers(Long userId) {
return usersDao
.findById(userId)
.map(UserMapper::fromModel)
.map(UserDto::getFollowers)
.orElse(Collections.emptyList());
}
}
对于流程的第二部分,我们已经定义了一个将时间线保存到缓存的 DAL,但我们仍然需要实现一种从特定用户的时间线中检索推文的方法。
为此,我们将TweetsDal
通过添加方法修改我们的组件getTimelineForUser
:
public List<TweetDto> getTimelineForUser(Long userId, Long timelineSize) {
return tweetsDao
.findTimelineTweetsByUserId(userId, timelineSize)
.stream()
.map(TweetMapper::fromModel)
.collect(Collectors.toList());
}
并使用本机查询从时间线检索推文的 DAO :
@Query(
value =
"SELECT * FROM tweets t JOIN follows f ON f.followee_id = t.sender_id WHERE f.follower_id = :userId ORDER BY timestamp DESC LIMIT :limit",
nativeQuery = true)
List<Tweet> findTimelineTweetsByUserId(Long userId, Long limit);
通过这些方法,我们可以检索用户时间线的推文列表,并TimelineDto
使用我们之前定义的映射器进行构建。
最后,处理器是:
package com.georgeoliveira.tweets.worker.processors;
import com.georgeoliveira.tweets.common.timelines.dal.TimelinesDal;
import com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto;
import com.georgeoliveira.tweets.common.timelines.mappers.TimelineMapper;
import com.georgeoliveira.tweets.common.tweets.dal.TweetsDal;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.users.dal.UsersDal;
import com.georgeoliveira.tweets.common.users.dtos.UserDto;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TweetsProcessor implements Consumer<TweetDto> {
@Inject TimelinesDal timelinesDal;
@Inject TweetsDal tweetsDal;
@Inject UsersDal usersDal;
@Override
public void accept(TweetDto tweetDto) {
List<UserDto> authorFollowers = usersDal.getUserFollowers(tweetDto.getSenderId());
List<TimelineDto> timelineDtos =
authorFollowers
.stream()
.map(UserDto::getId)
.map(
userId ->
TimelineMapper.fromList(userId, tweetsDal.getTimelineForUser(userId, 100L)))
.collect(Collectors.toList());
timelineDtos.forEach(timelineDto -> timelinesDal.persistTimeline(timelineDto));
}
}
听众
该组件监听推文事件,并针对每个事件触发处理器流程:
package com.georgeoliveira.tweets.worker.listeners;
import com.georgeoliveira.events.Key;
import com.georgeoliveira.events.Value;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.mappers.TweetMapper;
import com.georgeoliveira.tweets.worker.processors.TweetsProcessor;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;
import java.io.IOException;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class TweetsListener {
@Inject TweetsProcessor tweetsProcessor;
@Topic("${topics.tweets}")
void listen(ConsumerRecord<Key, Value> event) throws IOException {
TweetDto tweetDto = TweetMapper.fromRecord(event);
tweetsProcessor.accept(tweetDto);
}
}
请注意,该offsetReset
策略定义为OffsetReset.EARLIEST
使监听器能够确保消费者从该主题最早可用的记录开始读取。有关偏移量管理的更多详细信息,请参阅此处。
检索时间线
让我们定义一条允许我们检索时间线推文的路线,正如我们之前在扇出方法的第二点中所说的那样:
主页时间线:仅从缓存中读取。
🤷
要“仅从缓存中读取”,需要一些组件。其结构与我们构建推文 API 时使用的结构相同。
服务
我们首先需要添加一个fromByteArray
方法TimelineMapper
:
public static Optional<TimelineDto> fromByteArray(byte[] timelineByteArray) {
try {
TimelineProtobuf.Timeline timelineProto =
TimelineProtobuf.Timeline.parseFrom(timelineByteArray);
TimelineDto timelineDto = fromProto(timelineProto);
return Optional.of(timelineDto);
} catch (InvalidProtocolBufferException | NullPointerException e) {
return Optional.empty();
}
}
我们将添加到的新方法将使用此方法TimelinesDal
:
public Optional<TimelineDto> getUserTimeline(Long userId) {
byte[] timelineByteArray = timelineCommandsDao.get(String.valueOf(userId));
return TimelineMapper.fromByteArray(timelineByteArray);
}
最后,服务将使用这个新方法:
package com.georgeoliveira.tweets.api.services;
import com.georgeoliveira.tweets.common.timelines.dal.TimelinesDal;
import com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TimelinesService {
@Inject TimelinesDal timelinesDal;
public List<TweetDto> getUserTimelineTweets(Long userId) {
return timelinesDal
.getUserTimeline(userId)
.map(TimelineDto::getTweetsList)
.orElse(Collections.emptyList());
}
}
控制器
控制器使用该服务从请求的用户那里检索时间线的推文列表:
package com.georgeoliveira.tweets.api.controllers;
import com.georgeoliveira.tweets.api.services.TimelinesService;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.PathVariable;
import java.util.List;
import javax.inject.Inject;
@Controller("/timelines")
public class TimelinesController {
@Inject TimelinesService timelinesService;
@Get("/{userId}")
HttpResponse<List<TweetDto>> getUserTimeline(@PathVariable Long userId) {
List<TweetDto> tweetDtoList = timelinesService.getUserTimelineTweets(userId);
if (tweetDtoList.isEmpty()) {
return HttpResponse.noContent();
}
return HttpResponse.status(HttpStatus.FOUND).body(tweetDtoList);
}
}
让我们通过构建项目来看一下它的工作原理:
$ ./gradlew build -x test
然后运行它:
$ java -jar build/libs/tweets-all.jar
然后设置另一个用户来关注我们之前创建的用户:
tweets=# insert into users(username) values('cool_user');
INSERT 0 1
tweets=# insert into follows(follower_id, followee_id) values(2, 1);
INSERT 0 1
然后发布来自用户的推文1
:
$ curl -L -X POST 'http://localhost:8080/tweets' -H 'Content-Type: application/json' --data-raw '{
"sender_id": 1,
"text": "my new tweet",
"timestamp": 1619626150979
}
最后检索用户的时间线2
:
$ curl http://localhost:8080/timelines/2
[{"id":2,"text":"my new tweet","timestamp":[2021,4,28,16,9,10,979000000],"senderId":1}]
结论
我们刚刚使用 Micronaut、Kafka 和 Debezium 创建了一个事件驱动的应用程序,其中采用了“真实世界”应用程序中应用的许多模式。
由于我们选择使用 Kafka 作为代理,我们的项目也因此变得更加复杂。对于小型应用程序来说,这或许没问题,但对于“更大”、更复杂的应用程序来说,最好考虑创建自己的事件存储。
文章来源:https://dev.to/foolonthehill/build-a-event-driven-app-with-micronaut-kafka-and-debezium-11be