发布于 2026-01-05 4 阅读
0

流处理如何让你的事件驱动架构更出色?什么是事件驱动架构?消息队列简介 流处理简介 你应该使用流处理还是消息队列?基于 Kafka 的参考架构 后续步骤

流处理如何让你的事件驱动架构更出色

什么是事件驱动架构?

消息队列简介

流处理简介

应该使用流处理还是消息队列?

使用 Kafka 的参考架构

后续步骤

如果您是一位架构师或开发人员,正在考虑事件驱动架构,那么流处理可能正是您所需要的,它可以使您的应用程序更快、更具可扩展性、更解耦。

本文是事件驱动架构系列文章的第三篇,我们将简要回顾第一篇文章的内容。第一篇文章概述了事件驱动架构的优势、一些可选方案以及一些常见的模式和反模式。我们还将回顾第二篇文章,该文章更详细地介绍了消息队列,并使用 Redis 和 RSMQ 部署了一个快速入门的消息队列。

本文还将深入探讨流处理。我们将讨论您可能选择流处理作为架构的原因、它的一些优缺点,以及一个使用Apache Kafka 的快速部署参考架构。

什么是事件驱动架构?

流处理是一种事件驱动架构。在事件驱动架构中,当一个组件执行了其他组件可能感兴趣的工作时,该组件(称为生产者)会生成一个事件——记录所执行的操作。其他组件(称为消费者)会消费这些事件,以便根据事件执行各自的任务。

消费者和生产者之间的这种解耦为事件驱动型架构带来了诸多好处:

  • 异步——组件之间的通信是异步的,避免了同步、单体架构造成的任何瓶颈。
  • 解耦——组件之间无需相互了解,可以独立进行开发、测试、部署和扩展。
  • 易于扩展——由于组件是解耦的,因此可以更容易地将瓶颈问题追踪到单个组件,并快速扩展。

事件驱动架构主要有两种类型:消息队列和流处理。让我们深入了解一下它们的区别。

消息队列简介

在消息队列这种最初的事件驱动架构中,生产者将消息放入一个指向特定消费者的队列中。该消息会一直保留在队列中(通常按先进先出的顺序),直到消费者将其取出,此时消息会被删除。

消息队列适用于那些你确切知道事件发生后需要执行什么操作的系统。当问题发生时,生产者会向队列发送一条消息,这条消息的目标消费者是队列中的消费者。消费者从队列中获取消息,然后执行下一步操作。一旦下一步操作完成,该事件就会从队列中永久移除。在消息队列中,流程通常由队列自身决定,因此出现了“智能代理/哑消费者”的概念,这意味着代理(队列)知道将消息发送到哪里,而消费者只是做出响应。

流处理简介

在流处理中,消息并非定向发送给特定接收者,而是发布到特定主题,供所有感兴趣的消费者访问。任何感兴趣的接收者都可以订阅该主题并阅读消息。由于消息必须对所有消费者可见,因此从流中读取的消息不会被删除。

生产者和代理不需要也不想知道消息的后续处理结果,或者消息的去向。生产者只需将消息发送给代理,代理发布消息,然后生产者和代理继续执行其他操作。感兴趣的消费者收到消息后即可完成处理。正是由于这种进一步的解耦,事件流系统能够随着项目的演进而轻松扩展。

无论整个系统如何运作,消费者都可以被添加和删除,并且可以更改其处理方式和处理内容。生产者和代理无需了解这些更改,因为这些服务是解耦的。这通常被称为“哑代理/智能消费者”——代理(流)仅仅是代理,并不了解路由。消息处理中的消费者才是智能组件;它们知道要监听哪些消息。

此外,消费者可以同时检索多条消息,并且由于消息不会被删除,消费者可以回放一系列过往消息。例如,新消费者可以回溯读取其部署之前的旧消息。

流处理已成为许多事件驱动系统的首选方案。与消息队列相比,它具有诸多优势,例如支持多个消费者、事件重放以及滑动窗口统计。总而言之,它能显著提升系统的灵活性。

应该使用流处理还是消息队列?

以下是每种产品的几个使用场景:

消息队列

消息队列,例如RabbitMQActiveMQ,非常流行。消息队列在路由逻辑复杂或已知的系统中,或者需要保证每条消息只被送达一次时,尤其有用。

消息队列的典型应用场景是繁忙的电商网站,这类网站需要高可用性、高响应率,并且路由逻辑已知且不易更改。在这些限制条件下,消息队列能够提供异步通信和服务解耦的强大优势,同时保持架构的简洁性。

其他用例通常涉及系统依赖性或约束,例如系统的前端和后端是用不同的语言编写的,或者需要集成到遗留基础设施中。

流处理

流处理适用于消息消费者较为复杂的系统,例如:

  • 网站活动追踪。繁忙的网站上的活动会产生大量消息。利用信息流,您可以创建一系列实时信息流,包括页面浏览量、点击量、搜索量等等,并允许各种用户监控、报告和处理这些数据。
  • 日志聚合。利用流,可以将日志文件转换为集中式的日志消息流,方便用户使用。您还可以计算指标的滑动窗口统计信息,例如每秒或每分钟的平均值。这可以显著减少输出数据量,从而提高基础架构的效率。
  • 联网也会产生大量消息。流可以处理大量消息,并以高度可扩展和高性能的方式将其发布给大量消费者。
  • 事件溯源。正如前文所述,流可用于实现事件溯源,其中更新和删除操作永远不会直接对数据执行;而是将实体的状态变化保存为一系列事件。
  • 消息传递。Twitter 和 LinkedIn 等复杂且高可用的消息传递平台使用流(Kafka)来驱动指标、向新闻源传递消息等等。

使用 Kafka 的参考架构

在之前的文章中,我们部署了一个快速搭建的消息队列来学习队列的相关知识。现在让我们做一个类似的流处理示例。

流处理架构有很多选择,包括以下几种:

  • Apache Kafka
  • Apache Spark
  • Apache Beam/Google Cloud Data Flow
  • Spring Cloud 数据流

我们将使用Heroku 上的 Apache Kafka 参考架构。Heroku是一个云平台即服务 (PaaS),它提供Kafka 作为附加组件。Heroku 的云平台让部署流处理系统变得轻松便捷,无需自行托管或运行。由于 Heroku 提供了一个Terraform 脚本,可以一步完成所有必要的代码和配置部署,因此它是学习流处理的快速简便方法。

这里我们不会详细介绍部署步骤,因为参考架构页面已经详细说明了这些步骤。不过,该页面部署了一个示例电子商务系统,展示了流处理的主要组件和优势。浏览或购买产品的点击操作会被记录为事件并发送到 Kafka。

电子商务示例

以下是edm-relay的一个关键代码片段,它负责向 Kafka 流发送消息。向 Kafka 发布事件非常简单,只需调用生产者 API 插入一个 JSON 对象即可。

app.post('/produceClickMessage', function (req, res) {
   try {
     const topic = `${process.env.KAFKA_PREFIX}${req.body.topic}`;
     console.log(`topic: ${topic}`);
     producer.produce(
       topic,
       null,
       // Message to send. Must be a buffer
       Buffer.from(JSON.stringify(req.body)),
       // for keyed messages, we also specify the key - note that this field is optional
       null,
       // you can send a timestamp here. If your broker version supports it,
       // it will get added. Otherwise, we default to 0
       Date.now(),
     );
   } catch (err) {
     console.error('A problem occurred when sending our message');
     throw err;
   }
   res.status(200).send("{\"message\":\"Success!\"}")
 });
Enter fullscreen mode Exit fullscreen mode

实时仪表盘会接收点击事件流并显示分析结果。这对于商业分析非常有用,可以用来探索最受欢迎的产品、不断变化的趋势等等。

EDM仪表盘

以下是edm-stream中订阅该主题的代码:

.on('ready', (id, metadata) => {
   consumer.subscribe(kafkaTopics);  
   consumer.consume();
   consumer.on('error', err => {
     console.log(`Error in Kafka consumer: ${err.stack}`);
   });
   console.log('Kafka consumer ready.' + JSON.stringify(metadata));
   clearTimeout(connectTimoutId);
 })
Enter fullscreen mode Exit fullscreen mode

然后通过为每条消息调用事件处理程序来消费流中的消息:

 .on('data', function(data) {
   const message = data.value.toString()
   console.log(message, `Offset: ${data.offset}`, `partition: ${data.partition}`, `consumerId: edm/${process.env.DYNO || 'localhost'}`);
   socket.sockets.emit('event', message);
   consumer.commitMessage(data);
 })

Enter fullscreen mode Exit fullscreen mode

这个参考架构不仅仅是用来买咖啡的;它是任何想要追踪点击量并在实时仪表盘中生成报告的 Web 应用的起点。它是开源的,所以您可以随意尝试并根据自己的需求进行修改。

Kafka 示例实现

流处理不仅可以解耦组件,使它们易于独立构建、测试、部署和扩展,而且还通过在组件之间创建“哑”代理来增加另一层解耦。

后续步骤

如果您还没有阅读过,请阅读本系列的其他文章,了解事件驱动架构的优势以及如何使用 Redis 和 RSMQ 部署示例消息队列

文章来源:https://dev.to/heroku/how-stream-processing-makes-your-event-driven-architecture-even-better-5ehg