为何选择 Kafka?事件驱动架构开发者指南
什么是 Kafka?
Kafka是一个开源分布式事件流平台,旨在处理实时数据馈送。
Kafka 最初由 LinkedIn 开发,后来在 Apache 软件基金会下开源,现在广泛用于构建高吞吐量、容错和可扩展的数据管道、实时分析和事件驱动架构。
Kafka解决了什么问题?
在 Kafka 之前, RabbitMQ和ActiveMQ等传统消息队列被广泛使用,但它们在处理海量、高吞吐量实时数据流方面存在局限性。
Kafka 旨在通过以下方式解决这些问题:
- 大规模数据处理——Kafka 针对跨分布式系统提取、存储和分发大量数据流进行了优化。
- 容错——Kafka 在多个节点之间复制数据,确保即使代理发生故障,数据仍然可用。
- 持久性——消息保留在磁盘上,允许消费者在需要时重播事件。
- 支持事件驱动架构——它支持微服务之间的异步通信,使其成为现代云应用程序的理想选择。
何时使用 Kafka
当您需要以下条件时,Kafka 是正确的选择:
- 高吞吐量、实时数据处理——非常适合日志处理、金融交易和物联网数据流。
- 微服务解耦——Kafka 充当中介,允许微服务异步通信而无需直接依赖。
- 事件驱动系统——如果您的架构围绕对变化的反应(例如,用户事件触发多个下游操作),那么 Kafka 是一个不错的选择。
- 可靠的持久性消息传递——与可能丢失消息的传统消息队列不同,Kafka 会在可配置的时间段内保留消息,确保持久性和可重放性。
- 可扩展性和容错性——Kafka 的分布式特性使其能够水平扩展,同时通过复制保持容错能力。
Kafka 的工作原理
Kafka由几个关键组件组成:
1. 消息
消息是Kafka中最小的数据单位。
它可以是 JSON 对象、字符串或任何二进制数据。
消息可能有一个关联的键,它决定了消息将存储在哪个分区中。
2.主题
主题是生产者发送消息并由消费者读取消息的逻辑通道。主题有助于对消息进行分类(例如,日志、交易、订单)。
3. 制作人
生产者是向主题发布消息的 Kafka 客户端。消息可以通过三种方式发送:
- 发射后不管——生产者无需等待确认即可发送消息,确保最大速度,但存在数据丢失的风险。
- 同步发送——生产者等待 Kafka 的确认后再继续,确保可靠性但会增加延迟。
- 异步发送——生产者以异步方式批量发送消息,在速度和可靠性之间取得平衡。
Kafka 允许配置确认(ACK)来平衡一致性和性能:
- ACK 0 – 无需确认(最快但风险较大)。
- ACK 1 – 当领导代理收到消息时确认该消息(更快但不太安全)。
- ACK All – 仅当所有副本确认收到时才确认消息(较慢但最安全)。
生产者优化
- 消息压缩和批处理——Kafka 生产者可以在将消息发送到 Broker 之前对其进行批处理和压缩。这可以提高吞吐量并减少磁盘使用量,但会增加 CPU 开销。
- Avro 序列化器/反序列化器——使用 Avro 代替 JSON 需要预先定义模式,但它可以提高性能并减少存储消耗。
4.分区
Kafka 主题分为多个分区,从而实现并行处理和可扩展性。
分区中的消息是有序的且不可变的。
5.消费者
消费者从分区读取消息并使用偏移量跟踪其位置。
消费者可以重置偏移量来重新处理旧消息。
Kafka 消费者采用轮询模型,这意味着他们不断地向代理请求数据,而不是代理将数据推送给他们。
消费者优化
-
分区分配策略:
- 范围——消费者获得连续的分区。
- 循环——分区在消费者之间均匀分布。
- 粘性——尝试在重新平衡期间尽量减少变化。
- 合作粘性——类似于粘性,但允许合作重新平衡。
-
批量大小配置——消费者可以定义每个轮询周期应检索多少条记录或多少数据。
6.消费者组
消费者组是一组共同处理来自某个主题的消息的消费者。
Kafka 确保单个分区仅被组内的一个消费者使用,以保持秩序。
7. 偏移管理
当消费者读取消息时,它会更新其偏移量——最后处理的消息的位置。
- 自动提交——Kafka 会定期自动提交偏移量。
- 手动提交– 应用程序明确提交偏移量,同步或异步均可。
8.经纪人
代理是存储消息、分配偏移量并处理客户端请求的 Kafka 服务器。
多个代理组成Kafka 集群,以实现可扩展性和容错能力。
9.动物园管理员
Zookeeper 管理元数据、跟踪代理并处理领导者选举。
然而,较新的 Kafka 版本正在努力消除对 Zookeeper 的依赖。
示例:Kafka 实际应用
为了更好地理解 Kafka,让我们看一个简单的例子,其中一个生产者向一个主题发送消息,两个不同的消费者分别处理这些消息:一个模拟电子邮件通知服务,另一个将消息存储在数据库中。
设置 Kafka(docker-compose.yml)
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
restart: always
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
restart: always
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
生产者代码(producer.js)
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "family-producer",
brokers: ["localhost:9092"],
});
const producer = kafka.producer();
async function sendMessage() {
await producer.connect();
console.log("🟢 Producer connected");
const message = {
id: Date.now(),
content: `Hi Mom! Time is ${new Date().getMinutes()}:${new Date().getSeconds()}`,
};
await producer.send({
topic: "family-topic",
messages: [{ value: JSON.stringify(message) }],
});
console.log(`📨 Sent: ${JSON.stringify(message)}`);
await producer.disconnect();
}
sendMessage();
电子邮件通知消费者(consumer.js)
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "family-email-consumer",
brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "email-group" });
async function consumeMessages() {
await consumer.connect();
await consumer.subscribe({ topic: "family-topic", fromBeginning: true });
console.log("🟢 Email Consumer Connected");
await consumer.run({
eachMessage: async ({ message }) => {
const msg = JSON.parse(message.value.toString());
console.log(`📩 Notification Sent: "${msg.content}"`);
console.log(`📧 Email Sent: "${msg.content}" \n`);
},
});
}
consumeMessages();
数据库存储消费者(dbconsumer.js)
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "family-db-consumer",
brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "db-group" });
async function consumeMessages() {
await consumer.connect();
await consumer.subscribe({ topic: "family-topic", fromBeginning: true });
console.log("🟢 DB Consumer Connected");
await consumer.run({
eachMessage: async ({ message }) => {
const msg = JSON.parse(message.value.toString());
console.log(`💾 Storing message in DB: "${msg.content}" \n`);
},
});
}
consumeMessages();
最后的想法
Kafka 是一个强大的工具,它改变了实时数据处理。
然而,虽然它提供了令人难以置信的可扩展性和耐用性,但评估它是否适合您的架构至关重要。
敬请期待!我会写一篇后续文章,比较Kafka 和 Redis,探讨它们的用例以及何时该选择其中之一。🚀
我一直在研究一种超级方便的工具,叫做LiveAPI。
LiveAPI可帮助您在几分钟内记录所有后端 API
使用 LiveAPI,您可以快速生成交互式 API 文档,允许用户直接从浏览器执行 API。
如果您厌倦了手动为 API 创建文档,这个工具可能会让您的生活更轻松。
资料来源:部分图片取自此处:1
鏂囩珷鏉ユ簮锛�https://dev.to/lovestaco/why-kafka-a-developer-friend-guide-to-event-driven-architecture-4ekf