如何在 Node.js 中使用事件驱动编程

2025-06-04

如何在 Node.js 中使用事件驱动编程

作者:Vlado Tesanovic ✏️

当今最常用的软件构建方法是采用请求/响应机制,底层采用分层架构(n 层),调用在各个层之间垂直传播。像MVC这样的模式已经非常流行,并且在某种程度上成为了人们学习和编写软件的标准。

虽然分层架构是最简单的架构,可以解决很多问题,但这并不意味着它是解决软件行业所有问题的灵丹妙药。有些软件可以使用不同的设计模式来编写,使其更具表现力。分层架构非常适合中小型项目。它的难点在于保持一切井然有序,不要构建过多的层级,否则最终会变成Baklava 代码

分层架构

分层架构

另外,我们有事件驱动编程,它主要用于前端开发,其中一个事件可以在系统中传播,并且许多参与者可以在捕获该事件后采取行动。数据流是单向的,无需编辑现有组件即可添加新功能。

虽然事件驱动编程在构建用户界面方面占据主导地位,但我们也可以用它来编写服务器端代码。好的用例是高度异步的系统,它们不需要服务器立即响应,并使用不同的通信渠道来发布请求的进度。

LogRocket 免费试用横幅

示范

在本教程中,我们不仅将分派事件来演示事件驱动编程,还将实现CQRS设计模式,将编辑数据(命令)的代码与用于读取数据(查询)的代码分开。

我们的应用程序的主要组成部分是:

  • 命令
  • 处理程序
  • 活动
  • 查询

命令是运行业务逻辑或调度新事件的操作。事件将用于调度其他命令。我们也可以拥有事件处理器。查询操作和查询处理器负责查询(读取)项目。

设想一个竞价系统,其中一个动作可以按照定义的顺序触发其他动作,并且我们希望它高度异步。最终我们将实现如下功能:

  • 检查出价是否最高
  • 向所有相关方(投标人和所有者)发送电子邮件
  • 在数据库中添加出价
  • 为该出价创建活动
  • 收到最新出价后,竞标过程将延长两个小时(竞标费拍卖

以下是我们系统流程图:

演示流程

通过CQRS模块的实现,每个事件都会产生一个或多个命令,每个命令都会触发一个新的事件。

这个事件驱动系统支持面向方面编程范式。这意味着你可以在不改变现有功能的情况下为软件添加额外功能。在我们的例子中,这意味着将新的命令命令处理程序事件链接起来。

执行

我们选择Nestjs来实现我们想象中的竞价系统所描述的解决方案。

Nestjs在其丰富的生态系统中提供了CQRS模块。该模块的主要构建块是三个可注入类:EventBusQueryBusCommandBus。顾名思义,每个类都可以触发事件、查询或命令。

阅读和编写此演示的代码需要学习并深入研究 Nestjs,因为其中涉及许多概念。Nestjs 是一个功能丰富的框架,它严重依赖于装饰器可观察对象,并带有模块系统(类似于 Angular 的模块系统)、依赖注入控制反转等。

我会尽量只强调代码中重要的部分,否则这篇文章会太长。在文章底部,你会找到一个Github 仓库的链接,里面包含了所有代码和可用的 demo。目录结构如下:

应用程序结构

我们将从主控制器(和主路由 /)调度BidEvent在 Nestjs 中,控制器就是路由处理程序。

@Controller()
export class AppController {
  constructor(private readonly eventBus: EventBus, private queryBus: QueryBus) {}

  @Get()
  async bid(): Promise<object> {

    // We are hard-coding values here
    // instead of collecting them from a request
    this.eventBus.publish(
      new BidEvent('4ccd1088-b5da-44e2-baa0-ee4e0a58659d', '0ac04f2a-4866-42de-9387-cf392f64cd52', 233),
    );

    return {
      status: 'PENDING',
    };
  }

  @Get('/audiences')
  async getAudiences() {
    const allAudiences = await this.queryBus.execute(new GetAuctionQuery());

    return allAudiences;
  }
}
Enter fullscreen mode Exit fullscreen mode

我们系统真正的强大之处在于BidSaga类。此类(服务)的职责是监听BidEvents并发送命令。熟悉rxjs并使用ngrx包编写效果的开发者会发现这段代码非常熟悉,易于阅读

@Injectable()
export class BidSaga {

  @Saga()
  createBid = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(BidEvent),
      map((event: BidEvent) => {
        return new BidCommand(event.bidUser, event.auctionID, event.bidAmount);
      }),
    );
  }

  @Saga()
  createBidSuccess = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(BidEventSuccess),
      flatMap((event: BidEventSuccess) => {

        return [
          new MailCommand(event.user.email, {
            title: 'You did it...',
            message: 'Congrats',
          }),
          new PostponeAuctionCommand(event.auctionID),
          // create activity command
        ];
      }),
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

请注意,我们创建了bidTransactionGUID变量并将其传递给BidEvent,该值用于粘合命令和事件。

正如您在上面的代码中看到的,BidEvent将调度BidCommand。此外,在我们的代码中,BidHandler(用于BidCommand)将调度BidEventSuccessBidEventFail

export class AuctionModel extends AggregateRoot {
  constructor(private readonly auction: IAuctionInterface) {
    super();
  }

  postponeAuction() {
    // validation and etc.

    // postpone it, and return new auction object with postponed date
    const auction = { ...this.auction };

    this.apply(new AuctionEventsPostponed(auction));
  }

  bidOnAuction(userID: string, amount: number) {
    // validation and etc.
    try {

      // business logic
      // upon successful bidding, dispatch new event
      this.apply(new BidEventSuccess(this.auction.id, amount, { email: 'fake@email.com', id: userID }));

    } catch (e) {

      // dispatch bid event fail action
      this.apply(new BidEventFail(e));
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

上面显示的模型是通过 BidHandler 服务运行的。

BidEventSuccess 调度后,将启动新命令 - MailCommand 和 PostponeAuctionCommand。

@Injectable()
export class AuctionSaga {

  @Saga()
  createBid = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(AuctionEventsPostponed),
      flatMap((event: AuctionEventsPostponed) => {

        // send emails to all existing bidders
        const bidders = [
          new MailCommand('bidder1@emailid', {
            title: 'Someone made a bid',
            message: 'Hurry up',
          }),
          new MailCommand('bidder2@emailid', {
            title: 'Someone made a bid',
            message: 'Hurry up',
          }),
        ];

        return [
          ...bidders,
          // create activity
        ];
      }),
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

正如我们在上面的示例中所见,一切都与调度命令以及将它们与新事件链接起来有关。新功能的创建意味着新命令的创建以及随后触发的新事件。

如果此过程中出现任何问题,我们可以发送带有 bidTransactionGUID 信息的清理命令来删除系统中与此出价相关的内容。

结论

如果将事件驱动编程范式应用于正确的场景和场合,它将为应用程序架构带来巨大的优势。如果你设想一个应用程序的流程由事件决定,那么这种编程方法就非常适合。

存储库:https://github.com/vladotesanovic/cqrs


编者注:觉得这篇文章有什么问题?您可以在这里找到正确版本

插件:LogRocket,一个用于 Web 应用的 DVR

 
LogRocket 仪表板免费试用横幅
 
LogRocket是一款前端日志工具,可让您重播问题,就像它们发生在您自己的浏览器中一样。无需猜测错误发生的原因,也无需要求用户提供屏幕截图和日志转储,LogRocket 让您重播会话,快速了解问题所在。它可与任何应用程序完美兼容,不受框架限制,并且提供插件来记录来自 Redux、Vuex 和 @ngrx/store 的额外上下文。
 
除了记录 Redux 操作和状态外,LogRocket 还记录控制台日志、JavaScript 错误、堆栈跟踪、带有标头 + 正文的网络请求/响应、浏览器元数据以及自定义日志。它还会对 DOM 进行插桩,以记录页面上的 HTML 和 CSS,即使是最复杂的单页应用程序,也能重现像素完美的视频。
 
免费试用


如何在 Node.js 中使用事件驱动编程一文首先出现在LogRocket 博客上。

文章来源:https://dev.to/bnevilleoneill/how-to-use-event-driven-programming-in-node-js-55mm
PREV
了解 JavaScript 数据结构
NEXT
如何使用 CSS 滚动捕捉