轻松制作 Saga 模式
这是系列文章的第二部分。想要了解补偿操作,请参阅“补偿操作,Sagas 完整早餐的一部分”。
用传奇故事来规划旅行,但不要带行李
上次你和家人去公园玩的时候,大家都在讨论 Saga 设计模式。现在你想知道它是什么,是否应该把它融入到你自己的分布式系统设计中,以及如何实现它。众所周知,软件设计的核心就是追逐时尚1潮流2。
传奇故事
如果您不确定 Saga 模式是否适合您的场景,请问自己:您的逻辑是否涉及多个步骤,其中一些步骤跨越机器、服务、分片或数据库,而这些步骤不适合部分执行?事实证明,这正是 Saga 的用武之地。也许您正在检查库存,从用户的信用卡扣款,然后完成订单。也许您正在管理供应链。Saga 模式之所以有用,是因为它本质上就像一个状态机,可以存储程序进度,防止多次信用卡扣款,在必要时进行恢复,并且确切地知道如何在断电时安全地恢复到一致的状态。
一个常见于生活中的例子,用来解释 Saga 模式如何弥补失败,那就是旅行计划。假设你渴望去西雅图杜瓦米什地区享受雨水。你需要买机票、预订酒店,还要买一张去雷尼尔山的带导游的背包旅行的票。这三项任务是紧密相连的:如果你买不到机票,就没理由再买其他的了。如果你买了机票却没地方住,你肯定会想取消机票预订(或者重新预订酒店,或者另找住处)。最后,如果你订不到那趟背包旅行,那你来西雅图就真的没什么理由了,所以你干脆把行程全部取消吧。(开玩笑!)
上图:旅行计划失败时的简单补偿模型。
现实世界中有许多“要么全部做,要么全部不做”的软件应用程序:如果您成功地向用户收取了一件商品的费用,但您的履行服务报告该商品缺货,如果您不退还费用,用户会很不高兴。如果您遇到相反的问题,不小心“免费”交付了物品,那么您就会破产。如果协调机器学习数据处理管道的机器崩溃了,但追随者机器继续处理数据,却无处报告其数据,那么您可能需要支付非常昂贵的计算资源费用3。在所有这些情况下,拥有某种“进度跟踪”和补偿代码来处理这些“全部做或什么都不做”的任务正是 saga 模式所提供的。用 saga 术语来说,这些“全有或全无”的任务称为长时间运行的事务。这并不一定意味着这样的操作会运行“很长时间”,只是它们在逻辑时间4中需要比在本地与单个数据库交互的操作更多的步骤。
如何构建一个传奇故事?
一个 Saga 由两部分组成:
- 如果您需要“撤消”某些操作(即补偿) ,则定义“后退”的行为
- 努力向前发展的行为(即保存状态以便在遇到失败时知道从哪里恢复)
本博客的热心读者会记得我最近写了一篇关于补偿动作的文章。正如您从上面看到的,补偿只是 Saga 设计模式的一半。上面提到的另一半本质上是整个系统的状态管理。补偿动作模式可以帮助您了解在单个步骤(或用时间术语来说,活动)失败时如何恢复。但如果整个系统崩溃了怎么办?您从哪里开始重新启动?由于并非每个步骤都可能附带补偿,因此您只能根据存储的补偿来做出最佳猜测。Saga 模式会跟踪您当前的位置,以便您可以继续向前推进。
那么如何在我自己的代码中实现 sagas 呢?
我很高兴你问这个问题。
向前倾
耳边低语
这道题有点棘手,因为使用 Temporal 运行代码会自动保存状态,并在任何层级失败时重试。这意味着,使用 Temporal 的 Saga 模式非常简单,只需编写代码,指定某个步骤( Activity )失败时需要采取的补偿即可。结束。
这种神奇的背后原因在于 Temporal 的设计,它会自动跟踪程序的进度,并在发生灾难性故障时从中断的地方继续执行。此外,Temporal 会在失败时重试活动,除了指定重试策略外,您无需添加任何代码,例如:
RetryOptions retryoptions = RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1))
.setMaximumInterval(Duration.ofSeconds(100))
.setBackoffCoefficient(2)
.setMaximumAttempts(500).build();
要了解有关此自动化魔法如何工作的更多信息,请继续关注我即将发布的有关编排和编排(实现 sagas 的两种常见方式)的文章。
因此,为了表达我的程序的高级逻辑,包括假期预订步骤以及我希望在失败时承担的补偿,它在伪代码中看起来像以下内容:
try:
registerCompensationInCaseOfFailure(cancelHotel)
bookHotel
registerCompensationInCaseOfFailure(cancelFlight)
bookFlight
registerCompensationInCaseOfFailure(cancelExcursion)
bookExcursion
catch:
run all compensation activities
在 Java 中,该类Saga
会为您跟踪补偿:
@Override
public void bookVacation(BookingInfo info) {
Saga saga = new Saga(new Saga.Options.Builder().build());
try {
saga.addCompensation(activities::cancelHotel, info.getClientId());
activities.bookHotel(info);
saga.addCompensation(activities::cancelFlight, info.getClientId());
activities.bookFlight(info);
saga.addCompensation(activities::cancelExcursion,
info.getClientId());
activities.bookExcursion(info);
} catch (TemporalFailure e) {
saga.compensate();
throw e;
}
}
在其他语言的 SDK 中,你可以轻松地自己编写addCompensation
和compensate
函数。以下是 Go 语言的版本:
func (s *Compensations) AddCompensation(activity any, parameters ...any) {
s.compensations = append(s.compensations, activity)
s.arguments = append(s.arguments, parameters)
}
func (s Compensations) Compensate(ctx workflow.Context, inParallel bool) {
if !inParallel {
// Compensate in Last-In-First-Out order, to undo in the reverse order that activies were applied.
for i := len(s.compensations) - 1; i >= 0; i-- {
errCompensation := workflow.ExecuteActivity(ctx, s.compensations[i], s.arguments[i]...).Get(ctx, nil)
if errCompensation != nil {
workflow.GetLogger(ctx).Error("Executing compensation failed", "Error", errCompensation)
}
}
} else {
selector := workflow.NewSelector(ctx)
for i := 0; i < len(s.compensations); i++ {
execution := workflow.ExecuteActivity(ctx, s.compensations[i], s.arguments[i]...)
selector.AddFuture(execution, func(f workflow.Future) {
if errCompensation := f.Get(ctx, nil); errCompensation != nil {
workflow.GetLogger(ctx).Error("Executing compensation failed", "Error", errCompensation)
}
})
}
for range s.compensations {
selector.Select(ctx)
}
}
}
步骤和补偿的高级 Go 代码看起来与 Java 版本非常相似:
func TripPlanningWorkflow(ctx workflow.Context, info BookingInfo) (err error) {
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Second * 5,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 2},
}
ctx = workflow.WithActivityOptions(ctx, options)
var compensations Compensations
defer func() {
if err != nil {
// activity failed, and workflow context is canceled
disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx)
compensations.Compensate(disconnectedCtx, true)
}
}()
compensations.AddCompensation(CancelHotel)
err = workflow.ExecuteActivity(ctx, BookHotel, info).Get(ctx, nil)
if err != nil {
return err
}
compensations.AddCompensation(CancelFlight)
err = workflow.ExecuteActivity(ctx, BookFlight, info).Get(ctx, nil)
if err != nil {
return err
}
compensations.AddCompensation(CancelExcursion)
err = workflow.ExecuteActivity(ctx, BookExcursion, info).Get(ctx, nil)
if err != nil {
return err
}
return err
}
上面这段高级代码序列被称为 Temporal工作流。并且,如前所述,通过使用 Temporal 运行,我们无需担心实现任何簿记功能,例如通过事件溯源来跟踪进度,也无需添加重试和重启逻辑,因为这些都是免费的。因此,在编写使用 Temporal 运行的代码时,您只需担心编写补偿,其余部分均免费提供。
幂等性
好吧,还有第二件事要“担心”。您可能还记得,sagas 由两部分组成,第一部分是我们之前编写的补偿。第二部分“努力向前发展”涉及在失败的情况下重试活动。让我们深入研究其中一个步骤,好吗?Temporal 完成了所有重试和跟踪整体进度的繁重工作,但是由于代码可以重试,因此您(程序员)需要确保每个 Temporal 活动都是幂等的bookFlight
。这意味着无论调用一次还是多次,观察到的结果都是相同的。更具体地说,设置某个字段的函数foo=3
是幂等的,因为foo
无论调用多少次,之后的结果都是 3。函数foo += 3
不是幂等的,因为的值foo
取决于调用函数的次数。非幂等性有时看起来更微妙:如果你的数据库允许重复记录,那么调用函数时,INSERT INTO foo (bar) VALUES (3)
它会在表中创建与调用次数相同的记录,因此它不是幂等的。发送电子邮件或转账等函数的简单实现默认情况下也不是幂等的。
如果您现在因为实际应用中的操作比 set 复杂得多而犹豫不决foo=3
,请放心。有一个解决方案。您可以使用一个独特的标识符(称为幂等键,有时也称为referenceId
或类似的名称)来唯一地标识特定事务,并确保酒店预订事务有效发生一次。可以根据您的应用需求定义此幂等键。在旅行计划应用程序中,clientId
中的一个字段BookingInfo
用于唯一地标识事务。
type BookingInfo struct {
Name string
ClientId string
Address string
CcInfo CreditCardInfo
Start date.Date
End date.Date
}
您可能还看到了clientId
用于在上面的 Java 工作流代码中注册补偿的内容:
saga.addCompensation(activities::cancelHotel, info.getClientId());
但是,使用clientId
作为键可以限制特定人员一次预订多个假期。这或许正是我们想要的。然而,某些业务应用程序可能会选择通过组合clientId
和 来构建幂等键workflowId
,以允许每个客户一次预订多个假期。如果您想要一个真正唯一的幂等键,可以将 UUID 传递给工作流。具体选择取决于您的应用程序需求。
许多处理资金的第三方 API已经为此目的接受了幂等键。如果您需要自己实现类似的功能,请使用原子写入来记录迄今为止已看到的幂等键,并且如果操作的幂等键位于“已看到”集合中,则不要执行该操作。
优势与复杂性
Saga 模式确实会增加代码的复杂性,因此切勿仅仅因为微服务就将其实现在代码中。但是,如果您需要完成一项涉及多个服务的任务(例如预订包含机票和酒店的行程),并且部分执行实际上并非成功,那么 Saga 模式将是您的好帮手。此外,如果您发现 Saga 模式变得特别笨重,也许是时候重新考虑如何划分微服务,并卷起袖子进行重构了。总而言之,Temporal 模式让在代码中实现 Saga 模式变得相对简单,因为您只需编写每个步骤所需的补偿即可。敬请关注我的下一篇文章,我将在其中深入探讨 Saga 和订阅场景,其中 Temporal 在降低 Saga 模式的复杂性方面尤为出色。
使用本文提到的代码的完整存储库可以在 GitHub 上找到:
如果您想查看使用 Temporal 的 sagas 其他教程,请查看以下资源:
此外,我的一位同事 Dominik Tornow 在YouTube上介绍了传奇故事。
在我们的课程、教程、文档和视频中了解有关 Temporal 的更多信息。
笔记
-
显然,不要仅仅因为某个东西很热门就重新设计你的系统。除非是一个新的 JavaScript 框架。那么,
npm install
尽快采用那个新包吧。😜 ↩ -
并不是说作者对这种情况有任何经验。咳嗽着支付了新车的价格😬↩
-
逻辑时间是分布式计算中的一个概念,用于描述分布式计算中不同机器上发生的事件的时序,因为机器可能没有物理同步的全局时钟。逻辑时间只是这些机器上发生的事件的因果顺序。对于长时间运行的事务,它基本上可以归结为在不同的机器上发生的许多“步骤” 。↩