如何在 Node.js 中使用事件驱动编程
作者:Vlado Tesanovic ✏️
当今最常用的软件构建方法是采用请求/响应机制,底层采用分层架构(n 层),调用在各个层之间垂直传播。像MVC这样的模式已经非常流行,并且在某种程度上成为了人们学习和编写软件的标准。
虽然分层架构是最简单的架构,可以解决很多问题,但这并不意味着它是解决软件行业所有问题的灵丹妙药。有些软件可以使用不同的设计模式来编写,使其更具表现力。分层架构非常适合中小型项目。它的难点在于保持一切井然有序,不要构建过多的层级,否则最终会变成Baklava 代码。
分层架构
另外,我们有事件驱动编程,它主要用于前端开发,其中一个事件可以在系统中传播,并且许多参与者可以在捕获该事件后采取行动。数据流是单向的,无需编辑现有组件即可添加新功能。
虽然事件驱动编程在构建用户界面方面占据主导地位,但我们也可以用它来编写服务器端代码。好的用例是高度异步的系统,它们不需要服务器立即响应,并使用不同的通信渠道来发布请求的进度。
示范
在本教程中,我们不仅将分派事件来演示事件驱动编程,还将实现CQRS设计模式,将编辑数据(命令)的代码与用于读取数据(查询)的代码分开。
我们的应用程序的主要组成部分是:
- 命令
- 处理程序
- 活动
- 查询
命令是运行业务逻辑或调度新事件的操作。事件将用于调度其他命令。我们也可以拥有事件处理器。查询操作和查询处理器负责查询(读取)项目。
设想一个竞价系统,其中一个动作可以按照定义的顺序触发其他动作,并且我们希望它高度异步。最终我们将实现如下功能:
- 检查出价是否最高
- 向所有相关方(投标人和所有者)发送电子邮件
- 在数据库中添加出价
- 为该出价创建活动
- 收到最新出价后,竞标过程将延长两个小时(竞标费拍卖)
以下是我们系统流程图:
通过CQRS模块的实现,每个事件都会产生一个或多个命令,每个命令都会触发一个新的事件。
这个事件驱动系统支持面向方面编程范式。这意味着你可以在不改变现有功能的情况下为软件添加额外功能。在我们的例子中,这意味着将新的命令和命令处理程序与事件链接起来。
执行
我们选择Nestjs来实现我们想象中的竞价系统所描述的解决方案。
Nestjs在其丰富的生态系统中提供了CQRS模块。该模块的主要构建块是三个可注入类:EventBus、QueryBus和CommandBus。顾名思义,每个类都可以触发事件、查询或命令。
阅读和编写此演示的代码需要学习并深入研究 Nestjs,因为其中涉及许多概念。Nestjs 是一个功能丰富的框架,它严重依赖于装饰器、可观察对象,并带有模块系统(类似于 Angular 的模块系统)、依赖注入、控制反转等。
我会尽量只强调代码中重要的部分,否则这篇文章会太长。在文章底部,你会找到一个Github 仓库的链接,里面包含了所有代码和可用的 demo。目录结构如下:
我们将从主控制器(和主路由 /)调度BidEvent。在 Nestjs 中,控制器就是路由处理程序。
@Controller()
export class AppController {
constructor(private readonly eventBus: EventBus, private queryBus: QueryBus) {}
@Get()
async bid(): Promise<object> {
// We are hard-coding values here
// instead of collecting them from a request
this.eventBus.publish(
new BidEvent('4ccd1088-b5da-44e2-baa0-ee4e0a58659d', '0ac04f2a-4866-42de-9387-cf392f64cd52', 233),
);
return {
status: 'PENDING',
};
}
@Get('/audiences')
async getAudiences() {
const allAudiences = await this.queryBus.execute(new GetAuctionQuery());
return allAudiences;
}
}
我们系统真正的强大之处在于BidSaga类。此类(服务)的职责是监听BidEvents并发送命令。熟悉rxjs并使用ngrx包编写效果的开发者会发现这段代码非常熟悉,易于阅读。
@Injectable()
export class BidSaga {
@Saga()
createBid = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(BidEvent),
map((event: BidEvent) => {
return new BidCommand(event.bidUser, event.auctionID, event.bidAmount);
}),
);
}
@Saga()
createBidSuccess = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(BidEventSuccess),
flatMap((event: BidEventSuccess) => {
return [
new MailCommand(event.user.email, {
title: 'You did it...',
message: 'Congrats',
}),
new PostponeAuctionCommand(event.auctionID),
// create activity command
];
}),
);
}
}
请注意,我们创建了bidTransactionGUID变量并将其传递给BidEvent,该值用于粘合命令和事件。
正如您在上面的代码中看到的,BidEvent将调度BidCommand。此外,在我们的代码中,BidHandler(用于BidCommand)将调度BidEventSuccess或BidEventFail。
export class AuctionModel extends AggregateRoot {
constructor(private readonly auction: IAuctionInterface) {
super();
}
postponeAuction() {
// validation and etc.
// postpone it, and return new auction object with postponed date
const auction = { ...this.auction };
this.apply(new AuctionEventsPostponed(auction));
}
bidOnAuction(userID: string, amount: number) {
// validation and etc.
try {
// business logic
// upon successful bidding, dispatch new event
this.apply(new BidEventSuccess(this.auction.id, amount, { email: 'fake@email.com', id: userID }));
} catch (e) {
// dispatch bid event fail action
this.apply(new BidEventFail(e));
}
}
}
上面显示的模型是通过 BidHandler 服务运行的。
BidEventSuccess 调度后,将启动新命令 - MailCommand 和 PostponeAuctionCommand。
@Injectable()
export class AuctionSaga {
@Saga()
createBid = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(AuctionEventsPostponed),
flatMap((event: AuctionEventsPostponed) => {
// send emails to all existing bidders
const bidders = [
new MailCommand('bidder1@emailid', {
title: 'Someone made a bid',
message: 'Hurry up',
}),
new MailCommand('bidder2@emailid', {
title: 'Someone made a bid',
message: 'Hurry up',
}),
];
return [
...bidders,
// create activity
];
}),
);
}
}
正如我们在上面的示例中所见,一切都与调度命令以及将它们与新事件链接起来有关。新功能的创建意味着新命令的创建以及随后触发的新事件。
如果此过程中出现任何问题,我们可以发送带有 bidTransactionGUID 信息的清理命令来删除系统中与此出价相关的内容。
结论
如果将事件驱动编程范式应用于正确的场景和场合,它将为应用程序架构带来巨大的优势。如果你设想一个应用程序的流程由事件决定,那么这种编程方法就非常适合。
存储库:https://github.com/vladotesanovic/cqrs
编者注:觉得这篇文章有什么问题?您可以在这里找到正确版本。
插件:LogRocket,一个用于 Web 应用的 DVR
LogRocket是一款前端日志工具,可让您重播问题,就像它们发生在您自己的浏览器中一样。无需猜测错误发生的原因,也无需要求用户提供屏幕截图和日志转储,LogRocket 让您重播会话,快速了解问题所在。它可与任何应用程序完美兼容,不受框架限制,并且提供插件来记录来自 Redux、Vuex 和 @ngrx/store 的额外上下文。
除了记录 Redux 操作和状态外,LogRocket 还记录控制台日志、JavaScript 错误、堆栈跟踪、带有标头 + 正文的网络请求/响应、浏览器元数据以及自定义日志。它还会对 DOM 进行插桩,以记录页面上的 HTML 和 CSS,即使是最复杂的单页应用程序,也能重现像素完美的视频。
免费试用。
如何在 Node.js 中使用事件驱动编程一文首先出现在LogRocket 博客上。
文章来源:https://dev.to/bnevilleoneill/how-to-use-event-driven-programming-in-node-js-55mm