Postgres 中的事件存储

2025-06-09

Postgres 中的事件存储

任何基础设施的关键部分之一就是存储。与传统的关系模型相比,将事件存储在日志中非常简单。然而,当你体验到一款成功产品的幸运时,即使是日志式存储也必须不断发展才能跟上时代的步伐。

简单的实现

当我开始使用事件溯源时,我希望尽可能地简化。说实话,我无法理解许多常见事件溯源数据库实现的细节。所以我从一个简单的表开始。

CREATE TABLE Event
(
    SequenceNum bigserial NOT NULL,
    StreamId uuid NOT NULL,
    Data jsonb NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    LogDate timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (SequenceNum)
);

CREATE INDEX idx_event_streamid ON Event (StreamId);
Enter fullscreen mode Exit fullscreen mode

我使用了序列化写入器。(翻译:所有请求都排队,一次处理一个,没有并发。)我使用了完全一致性。(翻译:保存事件和更新关系/文档模型在同一个数据库中的同一个事务中执行。)

这种配置对我们来说效果很好,性能良好(即使在 t2.micro 实例上也是如此),而且相对容易理解。如果这是一个内部系统,我可能会就此打住。但对于多租户应用来说,这种配置迟早会遇到扩展瓶颈。

💡附言:事件溯源非常棒。
当我们更改关系模型时,我们的“数据迁移”包括删除受影响的表,使用更新后的架构重新创建它们,然后将事件重播到这些表上。该过程在部署期间自动进行。我们无需编写迁移脚本。<3

扩大实施

事件存储的下一个版本将包含一些优化,以支持进一步的扩展功能。其中大部分内容在 7 年前的《构建事件存储》一书中已有描述。然而,当时我并没有完全理解其中的一些细微差别。为了避免搬起石头砸自己的脚,我选择先实现一个简单版本。现在,我正着手进行优化。

事件表

让我们从事件表本身开始。尽管更新后的表格几乎完全相同,但我还是会继续解释每个字段和索引的用途。

CREATE TABLE IF NOT EXISTS Event
(
    SequenceNum bigserial NOT NULL,
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Data jsonb NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    LogDate timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (SequenceNum),
    UNIQUE (StreamId, Version),
    FOREIGN KEY (StreamId)
        REFERENCES Stream (StreamId)
);
Enter fullscreen mode Exit fullscreen mode

字段

唯一两个绝对必填的字段是StreamIdData。其他所有字段都支持附加功能。

SequenceNum
事件监听器使用它来跟踪它们在流中的当前位置。这里真正重要的是保留事件保存的顺序,而一个自增的大整数可以很好地做到这一点。
StreamId
流的标识符。我选择了 uuid,但还有很多其他可行的标识选择。
Version
此字段在之前的版本中不存在,但为了优化而添加了它。如果将流视为数组,则这是事件的索引号。将其与事件一起保存有助于避免手动计数。
Data
这是事件的序列化数据。我选择了 jsonb 格式,但还有其他选择。如果我有更多资源来开发自己的自省工具,为了提高速度,我可能会使用像 Protocol Buffers 这样的二进制序列化方法。
Type
这是事件的类型。存储此信息可以支持过滤……以避免在监听器未使用该事件时进行获取/反序列化。我也用它来进行反序列化。
Meta
这支持审计和追踪。我在这里输入了用户、执行权限、关联 ID 等信息。
LogDate
除了审计和追踪之外,它还可以用于跨分片排序。虽然在业务逻辑中使用 LogDate 很诱人,但它应该只用于基础设施用途。如果需要时间上下文,事件应该包含自己的时间戳。不过我承认我曾经用 LogDate 做过报告。:S

索引/键

SequenceNum主键
允许基于位置的有效查询。即WHERE SequenceNum > @LastSeenSeqNum
StreamId, Version唯一键
允许按版本顺序高效加载特定流。Postgres 可以使用此索引进行处理,ORDER BY Version无需额外的排序步骤。
StreamId外键
这主要是“训练轮”,用于在出现错误时确保数据完整性。一旦代码经过验证,就可以将其移除以提高性能。

⚠️来自未来的提示
:事实证明,bigserial此处使用 会给并发带来问题。由于 PG 序列的工作方式,有时会出现数字乱序的情况。例如,SequenceNum 为 7 的事件在 SequenceNum 为 6 之前写入。这可能会导致活动侦听器(通过 LISTEN/NOTIFY)错过事件。我最终不得不使用常规的bigint,并使用单独的单行表来跟踪已发出的内容。有关更深入的了解,请参阅注释。StreamId由于未使用下表,外键也被删除了。

流表

我们希望在事件存储中添加的功能之一是支持并发写入。并发写入意味着:我可以部署多个业务服务副本(生成事件的服务,也称为命令处理程序),而无需在它们之间进行锁定或协调。我们甚至可以在 Lambda 等“无服务器”架构上运行这些服务,以按需自动扩展计算资源。这是通过使用乐观并发 (Optimistic Concurrency) 实现的。您可以将其想象成 Git 中的合并冲突——两个不同的分支独立地对同一行代码进行了更改。在这种情况下,两个独立的用户尝试同时对同一实体进行更改。这就是我们用来检测这种情况的表。

⚠️未来展望
我对 Lambda 命令处理程序的分析是,它不是一个好主意。原因在于 Lambda 的初始加载性能糟糕透顶,影响了用户体验。即使我用脚本保持 Lambda 的运行,或者 AWS 提供了始终在线的 Lambda,当函数扩展以处理负载时,仍然会遇到启动问题。与常规服务不同,函数一次只能处理一个请求。它会排队等待请求数量达到一定数量后再进行扩展。我们认为这不适合我们面向用户的服务。

CREATE TABLE IF NOT EXISTS Stream
(
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Type text NOT NULL,
    PRIMARY KEY (StreamId)
);

CREATE INDEX IF NOT EXISTS idx_stream_type ON Stream (Type);
Enter fullscreen mode Exit fullscreen mode

该表的核心只是跟踪每个流的当前版本。以下是使用此表的具体流程。

  • 在执行业务逻辑之前,获取流的当前版本。
  • 执行业务逻辑,产生新的事件。
  • 仅当以下情况才保存事件(在事务中)
    • Stream 表中的版本与您开始时的版本相同
    • 然后将流表更新为最后保存的版本
    • 指示事件是否已保存。例如返回 true/false

这确保写入者知道何时尝试保存冲突事件,并且他们可以返回错误、重试或按照业务规则定义的其他冲突解决流程。我们可能只会默认报错,直到有其他理由采取其他措施。

⚠️未来注意事项:
这张表实际上并没有出现在我们的实现中。并发写入已经通过 StreamId + Version 的唯一键处理。并发写入将计算相同的版本,并且只有其中一个会成功。更多信息请参见评论。

并发服务与并发用户

至少,我们需要确保从开始处理业务逻辑到尝试保存事件,流没有发生任何变化。大多数情况下,流都会在处理之前加载——以重建域的当前状态。因此,我们可以使用上次加载事件的版本作为预期版本。然后在保存新事件时验证它是否仍然相同。这将检测并发服务写入同一流时是否存在冲突。

但是,我们可以更进一步。客户端可以向我们发送他们提交请求时正在处理的数据版本。在保存事件时使用此版本作为预期版本,不仅可以检测服务之间的写入冲突,还可以检测用户的更改何时会意外覆盖其他用户的更改。

那这是什么Type

Type流的类型。例如,流的类型可能是“Ordering”,而流的某个事件的类型可能是“OrderPlaced”。它是完全可选的,但可以用来帮助生成快照(如下所述)。它还可以支持将事件筛选到特定类型的流。基于“类型”的索引应该不会影响写入性能,因为对表的更新仅基于“版本”。

快照表

当流变得非常大(可能超过 1000 个事件?)时,加载和重放流以获取当前状态可能会变得非常慢。常见的缓解方法是使用快照。我们不必每次都从头开始重建领域模型,而是偶尔重建领域模型的状态到最新版本并将其保存到数据库中。之后,为了加载流,我们首先获取快照,然后仅获取自快照版本以来的事件。下表支持此方法。

CREATE TABLE IF NOT EXISTS Snapshot
(
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Data jsonb NOT NULL,
    Revision int NOT NULL,
    PRIMARY KEY (StreamId)
);
Enter fullscreen mode Exit fullscreen mode

当数据流“过大”(这应该由请求时间等指标决定)时,应用程序代码应该创建快照。快照还应该不时地重新创建(例如在超过 1000 个事件之后),以及在快照的结构发生更改时重新创建——请参阅下面的修订。

什么是修订?

领域模型会随着时间推移而变化。有时,这些变化会导致上一个快照无法反序列化回新的领域模型。避免这种情况的一个相对简单的方法是将一个修订版本常量保存在领域代码的某个位置,并在进行结构性更改时递增该修订版本。加载快照时,如果修订版本不兼容,查询可以确保忽略它。

⚡ 我讨厌在开发过程中添加容易忘记的步骤——比如在结构更改时增加修订号。如果你有更好的处理方法,我很乐意听听!

快照如何保存?

或许最好的方法是建立一个单独的快照服务,并按计划运行。它可以使用流表来识别包含大量事件的流。它还可以检查现有的快照版本,以查找需要更新的版本。构建事件存储中介绍了如何同时完成这两项检查的查询。此外,还可以检查快照修订版本,以重新创建快照以查看结构更改。

现在,需要快照的流已经确定。下一步是调用相应的领域模型代码,将流重放至当前状态。然后,该状态将保存到快照表中。Stream.Type 可用于确定要调用哪个流重放代码。如果您未选择在 Stream 表中包含 Type 列,那么您也可以读取流的第一个事件。通常,这些事件可以指示流的类型。

您可能不需要快照。

对于许多类型的应用程序来说,设计良好的流在其生命周期内不会累积大量事件。使用快照当然没有坏处,但实现快照需要开发时间。而且快照可以随时添加。

我们取得了什么成就?

这些变化使使用事件存储的服务能够以无共享的方式进行扩展。它们甚至可以是无状态的,并在无服务器平台上使用。随着流大小的增长,它们还能提供一致的加载性能。

那么数据库本身的扩展怎么样?

我们还没有真正解决事件数据库本身的水平扩展问题。在多租户应用中,很容易在TenantId这些表中添加一个,并使用诸如Citus Data之类的分片解决方案。

我目前的多租户方案是基于模式隔离。转换成分片方案并不难,我最终可能会这么做。不过我非常喜欢模式隔离,所以我正在研究一些想法,使用一个简单的目录(例如 S3 上的一个文件)将租户映射到数据库实例。

更新时间:2019-01-23

下面的评论值得一读,尤其是与@damiensawyer的讨论。我最终得出结论,唯一需要的表是事件表。然后,如果需要,可以稍后添加快照表。毕竟,流表并非必需。事实上,它使实现变得相当复杂——我不得不创建一个不太小的存储过程来在附加事件的过程中保持流的更新。所以我认为这不值得。

鏂囩珷鏉ユ簮锛�https://dev.to/kspeakman/event-storage-in-postgres-4dk2
PREV
如何使用 i18next 翻译你的 React.js 应用
NEXT
Elm 0.19 让我们崩溃了💔