Golang 中实现数据库事务的简洁方法
您好,欢迎回来!
在前面的课程中,我们学习了如何编写 golang 代码对简单银行数据库的每个单独的表执行 CRUD 操作。
但在实际应用中,我们经常需要执行一个事务,将来自多个表的某些操作组合起来。今天我们将学习一种在 Golang 中简洁地实现它的方法。
以下是:
- YouTube 上完整系列播放列表的链接
- 以及它的Github 仓库
数据库事务
在我们开始编码之前,让我们先讨论一下交易!
什么是 DB 事务?
嗯,基本上,它是一个单一的工作单元,通常由多个数据库操作组成。
例如,在我们的简单银行中,我们想要10 USD
从转账account 1
到account 2
。
该交易包含5个操作:
- 我们创建一个
transfer
金额等于10的记录。 - 我们创建了金额等于的
entry
记录,因为资金正在从该账户转出。account 1
-10
- 我们
entry
为 创建另一条记录account 2
,但金额等于10
,因为资金正在转入该账户。 - 然后我们通过减去
balance
来更新。account 1
10
- 最后,我们通过添加来
balance
更新。account 2
10
这就是我们在本文中要实现的交易。我们稍后会讲到。
为什么需要使用DB事务?
主要有两个原因:
- 我们希望我们的工作单元可靠且一致,即使在系统出现故障的情况下。
- 我们希望在同时访问数据库的程序之间提供隔离。
ACID 属性
为了实现这两个目标,数据库事务必须满足ACID
以下属性:
A
是Atomicity
,这意味着要么事务的所有操作都成功完成,要么整个事务失败,并且所有内容都回滚,数据库保持不变。C
是Consistency
,这意味着数据库状态在事务执行后应该保持有效。更准确地说,所有写入数据库的数据都必须根据预定义的规则(包括约束、级联和触发器)保持有效。I
是Isolation
,这意味着所有并发运行的事务不应相互影响。隔离级别有多种,用于定义一个事务所做的更改何时对其他事务可见。我们将在另一节课中详细讨论。- 最后一个属性是
D
,代表Durability
。它的基本含义是,成功事务写入的所有数据必须保留在持久存储中,即使在系统发生故障的情况下也不会丢失。
如何运行 SQL DB 事务?
这很简单:
- 我们用该语句开始交易
BEGIN
。 - 然后我们编写一系列正常的 SQL 查询(或操作)。
- 如果全部成功,我们
COMMIT
就将事务变为永久的,数据库将改变到一个新的状态。 - 否则,如果任何查询失败,我们将
ROLLBACK
结束事务,因此该事务的先前查询所做的所有更改都将消失,并且数据库保持与事务之前的状态相同。
好了,现在我们对数据库事务有了一些基本的了解。让我们学习如何在 Golang 中实现它。
使用 Go 实现 DB 事务
我将store.go
在该db/sqlc
文件夹中创建一个新文件。在这个文件中,我们定义一个新的Store
结构体。
type Store struct {
}
该存储将提供单独运行数据库查询以及在事务内组合运行数据库查询的所有功能。
使用组合来扩展查询的功能
对于单独的查询,我们已经有了在以前的讲座中学习到Queries
的结构。sqlc
但是,每个查询仅对一个特定表执行一个操作。因此Queries
结构体不支持事务。因此,我们必须通过将其嵌入到Store
结构体中来扩展其功能,如下所示:
type Store struct {
*Queries
}
这被称为composition
,它是 Golang 中扩展结构体功能的首选方法,而不是inheritance
。通过嵌入Queries
,Store
提供的所有单独查询函数都Queries
将可供 使用Store
。
我们可以通过向这个新结构体添加更多函数来支持事务。为此,我们需要Store
一个对象,sql.DB
因为它是创建新数据库事务所必需的。
type Store struct {
*Queries
db *sql.DB
}
创建新商店
好的,现在让我们添加一个函数来创建一个新Store
对象。它将接受一个sql.DB
作为输入,并返回一个Store
。在函数内部,我们只需创建一个新Store
对象并返回它即可。
func NewStore(db *sql.DB) *Store {
return &Store{
db: db,
Queries: New(db),
}
}
这db
是输入sql.DB
,它Queries
是通过调用New()
带有该 db 对象的函数创建的。正如我们在之前的课程中已经了解的那样,该New()
函数是由 生成的。它创建并返回一个对象。sqlc
Queries
执行通用数据库事务
接下来,我们将添加一个函数来Store
执行通用数据库事务。
func (store *Store) execTx(ctx context.Context, fn func(*Queries) error) error {
...
}
这个想法很简单:它以上下文和回调函数作为输入,然后它将启动一个新的数据库事务,Queries
使用该事务创建一个新对象,使用创建的回调函数调用回调函数Queries
,最后根据该函数返回的错误提交或回滚事务。
让我们来实现它吧!
首先,为了开始一个新的事务,我们调用store.db.BeginTx()
,传入上下文,以及可选的sql.TxOptions
。
tx, err := store.db.BeginTx(ctx, &sql.TxOptions{})
isolation level
此选项允许我们为此交易设置自定义。
type TxOptions struct {
Isolation IsolationLevel
ReadOnly bool
}
如果我们没有明确设置它,那么将使用数据库服务器的默认隔离级别,这是read-committed
Postgres 的情况。
我们将在另一节课中详细讨论这一点。现在,我们先传递nil
这里参数,使用默认值。
func (store *Store) execTx(ctx context.Context, fn func(*Queries) error) error {
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return err
}
q := New(tx)
...
}
该BeginTx()
函数返回一个交易对象或一个错误。如果错误不为真nil
,则立即返回。否则,我们将New()
创建的交易作为参数调用该函数tx
,并返回一个新Queries
对象。
New()
这和我们在函数中使用的函数是一样的。唯一的区别是,我们NewStore()
传入的不是,而是对象。之所以能成功,是因为该函数接受一个接口,就像我们在上一节课中看到的那样:sql.DB
sql.Tx
New()
DBTX
type DBTX interface {
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
PrepareContext(context.Context, string) (*sql.Stmt, error)
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}
func New(db DBTX) *Queries {
return &Queries{db: db}
}
好的,现在我们有了在事务中运行的查询,我们可以使用该查询调用输入函数,并返回错误。
如果错误不是nil
,则我们需要通过调用来回滚事务tx.Rollback()
。它也会返回回滚错误。
如果回滚错误也不是nil
,那么我们必须报告 2 个错误。因此,我们应该fmt.Errorf()
在返回之前使用 将它们合并为 1 个错误:
func (store *Store) execTx(ctx context.Context, fn func(*Queries) error) error {
tx, err := store.db.BeginTx(ctx, &sql.TxOptions)
if err != nil {
return err
}
q := New(tx)
err = fn(q)
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
return fmt.Errorf("tx err: %v, rb err: %v", err, rbErr)
}
return err
}
return tx.Commit()
}
如果回滚成功,我们只返回原始交易错误。
最后,如果事务中的所有操作都成功,我们只需使用提交事务tx.Commit()
,并将其错误返回给调用者。
函数就完成了execTx()
。请注意,此函数未导出(以小写字母 e 开头),因为我们不希望外部包直接调用它。因此,我们将为每个特定的交易提供一个导出函数。
实施汇款交易
现在让我们继续添加一个新TransferTx()
功能来执行我们在视频开头讨论的汇款交易示例。
回想一下,它将创建一个新的转账记录,添加 2 个新的账户条目,并在单个数据库事务中更新 2 个账户的余额。
该函数的输入将是一个上下文和一个类型的参数对象TransferTxParams
。它将返回一个TransferTxResult
对象或一个错误。
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
}
该TransferTxParams
结构包含在两个账户之间转账所需的所有输入参数:
type TransferTxParams struct {
FromAccountID int64 `json:"from_account_id"`
ToAccountID int64 `json:"to_account_id"`
Amount int64 `json:"amount"`
}
FromAccountID
是汇款账户的 ID。ToAccountID
是将要汇款的账户的 ID。- 最后一个字段是
Amount
要发送的金额。
该TransferTxResult
结构体包含转账交易的结果。它有 5 个字段:
type TransferTxResult struct {
Transfer Transfer `json:"transfer"`
FromAccount Account `json:"from_account"`
ToAccount Account `json:"to_account"`
FromEntry Entry `json:"from_entry"`
ToEntry Entry `json:"to_entry"`
}
- 已创建的
Transfer
记录。 FromAccount
减去其余额之后。- 之后
ToAccount
其余额就被添加了。 FromEntry
记录资金流出的账户FromAccount
。- 并
ToEntry
记录资金转入的账户ToAccount
。
好了,现在我们可以实现转账交易了。首先,我们创建一个空的结果。然后,我们调用store.execTx()
之前编写的函数来创建并运行一个新的数据库交易。
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
...
return nil
})
return result, err
}
我们传入上下文和回调函数。现在我们只需返回nil
这个回调即可。最后,我们返回调用的结果和错误execTx()
。
现在让我们回过头来实现回调函数。基本上,我们可以使用查询对象q
来调用它提供的任何单独的 CRUD 函数。
请记住,此查询对象是从 1 个数据库事务创建的,因此我们调用的所有提供的方法都将在该事务中运行。
好吧,让我们通过调用来创建转移记录q.CreateTransfer()
,传入输入上下文和一个CreateTransferParams
参数:
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
...
err := store.execTx(ctx, func(q *Queries) error {
var err error
result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountID,
ToAccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
...
return nil
})
return result, err
}
输出传输将被保存到result.Transfer
或 错误。如果错误不是nil
,我们会立即返回。
这里你可以看到,我们result
从回调函数内部访问了外部函数的变量。arg
变量也类似。
这使得回调函数变成了一个closure
。由于 Go 缺乏对泛型类型的支持,当我们想要从回调函数中获取结果时,通常使用闭包,因为回调函数本身不知道它应该返回的结果的确切类型。
好的,创建转账记录的第一步已经完成。下一步是添加 2 个账户条目:1 个用于FromAccount
,1 个用于ToAccount
。
我们调用q.CreateAccountEntry()
,传入上下文和CreateAccountEntryParams
,其中AccountID
是arg.FromAccountID
,并且Amount
是-arg.Amount
因为资金正在从这个账户中转出。
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
...
err := store.execTx(ctx, func(q *Queries) error {
var err error
result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountID,
ToAccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}
...
return nil
})
return result, err
}
和以前一样,如果错误不为零,我们就返回它,这样交易就会回滚。
我们用类似的方法为到账账户创建账户分录。但这次,它是result.ToEntry
,AccountID
是arg.ToAccountID
,并且Amount
是,arg.Amount
因为资金正在转入这个账户。
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
...
err := store.execTx(ctx, func(q *Queries) error {
var err error
result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountID,
ToAccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}
result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
return nil
// TODO: update accounts' balance
})
return result, err
}
我们已经完成了账户条目的创建。最后一步是更新账户余额,这会更加复杂,因为它涉及到locking
如何防止潜在的deadlock
……
所以我觉得值得用一堂课来详细讲解。我们先在这里添加一个 TODO 注释,下次再回来实现它。
测试汇款交易
现在假设我们的转账交易已完成,包含 1 条转账记录,并创建了 2 个账户条目。我们需要进行测试,以确保它能够按预期工作。
我要创建一个新store_test.go
文件。它db
和我们的在同一个包里store.go
。然后我们为这个TransferTx()
函数定义一个新的单元测试。
func TestTransferTx(t *testing.T) {
...
}
首先我们需要创建一个新Store
对象。该NewStore()
函数需要一个sql.DB
对象。
如果您还记得的话,在上一讲中,我们已经通过以下函数调用sql.DB
在文件中创建了一个对象:main_test.go
sql.Open()
func TestMain(m *testing.M) {
conn, err := sql.Open(dbDriver, dbSource)
...
}
因此为了重用它,这里我们不会将结果分配给连接变量,而是声明一个新的全局变量:testDB
,并将命令的结果存储sql.Open()
在其中。
var testQueries *Queries
var testDB *sql.DB
func TestMain(m *testing.M) {
var err error
testDB, err = sql.Open(dbDriver, dbSource)
if err != nil {
log.Fatal("cannot connect to db:", err)
}
testQueries = New(testDB)
os.Exit(m.Run())
}
好的,现在我们可以回到单元测试并将testDB
对象传递到NewStore()
函数中以创建一个新的Store
:
func TestTransferTx(t *testing.T) {
store := NewStore(testDB)
account1 := createRandomAccount(t)
account2 := createRandomAccount(t)
...
}
接下来,我们使用上一节课中编写的函数创建两个随机账户createRandomAccount()
。我们将从账户 1 向账户 2 转账。
从我的经验来看,编写数据库事务是一件必须非常小心的事情。它可能很容易编写,但如果不仔细处理并发,也很容易变成一场噩梦。
因此,确保我们的交易运行良好的最佳方法是使用多个并发的 GO 例程来运行它。
假设我想运行n = 5
并发转账交易,每个交易都会将一笔amount
钱10
从账户 1 转移到账户 2。因此,我将使用一个简单的 for 循环进行n
迭代:
func TestTransferTx(t *testing.T) {
store := NewStore(testDB)
account1 := createRandomAccount(t)
account2 := createRandomAccount(t)
n := 5
amount := int64(10)
// run n concurrent transfer transaction
for i := 0; i < n; i++ {
go func() {
result, err := store.TransferTx(context.Background(), TransferTxParams{
FromAccountID: account1.ID,
ToAccountID: account2.ID,
Amount: amount,
})
...
}()
}
...
}
在循环内部,我们使用go
关键字来启动一个新的例程。在 Go 例程内部,我们store.TransferTx()
用一个背景上下文和一个TransferTxParams
对象调用函数,其中FromAccountID
is account1.ID
、ToAccountID
isaccount2.ID
和Amount
is10
就像我们amount = 10
上面声明的那样。
此函数返回结果或错误。我们不能直接testify/require
在这里检查它们,因为此函数运行在与我们函数所在的不同的 Go 例程中TestTransferTx
,所以无法保证在条件不满足时它会停止整个测试。
验证错误和结果的正确方法是将它们发送回我们的测试正在运行的主 go 例程,然后从那里检查它们。
为此,我们可以使用通道。通道旨在连接并发的 Go 例程,并允许它们彼此安全地共享数据,而无需显式锁定。
在我们的例子中,我们需要一个通道来接收错误,以及另一个通道来接收 TransferTxResult。我们使用 make 关键字来创建通道。
func TestTransferTx(t *testing.T) {
...
// run n concurrent transfer transaction
errs := make(chan error)
results := make(chan TransferTxResult)
for i := 0; i < n; i++ {
go func() {
result, err := store.TransferTx(context.Background(), TransferTxParams{
FromAccountID: account1.ID,
ToAccountID: account2.ID,
Amount: amount,
})
errs <- err
results <- result
}()
}
}
现在,在 goroutine 内部,我们可以使用这个箭头运算符err
向通道发送数据。通道应该在箭头运算符的左侧,而要发送的数据应该在箭头运算符的右侧。errs
<-
同样,我们发送result
到results
通道。然后,我们将从外部检查这些错误和结果。
我们只需运行一个 for 循环即可n
。为了从通道接收错误,我们使用相同的箭头运算符,但这次,通道位于箭头的右侧,而用于存储接收数据的变量位于左侧。
func TestTransferTx(t *testing.T) {
...
// run n concurrent transfer transaction
...
// check results
for i := 0; i < n; i++ {
err := <-errs
require.NoError(t, err)
result := <-results
require.NotEmpty(t, result)
...
}
}
这里我们要求没有错误,这意味着接收到err
的应该是nil
。同样,我们result
从results
通道接收并检查是否result
为空对象。
由于result
其中包含多个对象,让我们逐一验证一下。首先从以下内容开始result.Transfer
:
func TestTransferTx(t *testing.T) {
...
// check results
for i := 0; i < n; i++ {
err := <-errs
require.NoError(t, err)
result := <-results
require.NotEmpty(t, result)
// check transfer
transfer := result.Transfer
require.NotEmpty(t, transfer)
require.Equal(t, account1.ID, transfer.FromAccountID)
require.Equal(t, account2.ID, transfer.ToAccountID)
require.Equal(t, amount, transfer.Amount)
require.NotZero(t, transfer.ID)
require.NotZero(t, transfer.CreatedAt)
...
}
}
我们要求这个transfer
对象不为空。那么FromAccountID
的字段transfer
应该等于account1.ID
,ToAccountID
的字段transfer
应该等于account2.ID
,并且transfer.Amount
应该等于输入amount
。
字段不应该为零,因为它是一个自增字段。最后,它ID
不应该为零值,因为我们希望数据库填充默认值,即当前时间戳。transfer
transfer.CreatedAt
现在为了确保数据库中确实创建了转账记录,我们应该调用来store.GetTransfer()
查找 ID 等于的记录transfer.ID
:
func TestTransferTx(t *testing.T) {
...
// check results
for i := 0; i < n; i++ {
// check transfer
...
_, err = store.GetTransfer(context.Background(), transfer.ID)
require.NoError(t, err)
...
}
}
这里可以看到,因为Queries
对象是嵌入在里面的Store
,所以GetTransfer()
Query的功能对于Store也是可用的。
如果转移确实存在,这个函数就不应该返回错误,所以我们要求这里不出现错误。
接下来,我们将检查结果的账户分录。首先,FromEntry
:
func TestTransferTx(t *testing.T) {
...
// check results
for i := 0; i < n; i++ {
// check transfer
...
// check entries
fromEntry := result.FromEntry
require.NotEmpty(t, fromEntry)
require.Equal(t, account1.ID, fromEntry.AccountID)
require.Equal(t, -amount, fromEntry.Amount)
require.NotZero(t, fromEntry.ID)
require.NotZero(t, fromEntry.CreatedAt)
_, err = store.GetEntry(context.Background(), fromEntry.ID)
require.NoError(t, err)
...
}
}
和之前一样,我们检查它是否为空。账户余额ID
应为,account1.ID
且Amount
条目的 应等于,-amount
因为有资金支出。最后,条目的ID
和created
字段不应为零。
我们还尝试从数据库中获取条目以确保它确实已被创建。
检查条目的方法类似。因此,我只需复制此代码块,并将这些变量和字段名称更改为toEntry
。
func TestTransferTx(t *testing.T) {
...
// check results
for i := 0; i < n; i++ {
// check transfer
...
// check entries
...
toEntry := result.ToEntry
require.NotEmpty(t, toEntry)
require.Equal(t, account2.ID, toEntry.AccountID)
require.Equal(t, amount, toEntry.Amount)
require.NotZero(t, toEntry.ID)
require.NotZero(t, toEntry.CreatedAt)
_, err = store.GetEntry(context.Background(), toEntry.ID)
require.NoError(t, err)
// TODO: check accounts' balance
}
}
账户ID
应该是account2.ID
而不是account1.ID
。Amount
应该是amount
而不是 ,-amount
因为有钱进去。
最后,我们应该toEntry
从数据库中获取记录,而不是fromEntry
。
现在请记住,我们还应该检查账户余额。但是由于我们尚未实现更新账户余额的部分,因此我们暂时在这里添加一个 TODO 注释,我们将在下一节课中完成它。
好的,现在测试已经准备好了,让我们尝试运行它。
通过了!太棒了!
让我们运行整个包测试。
全部通过!覆盖率大概80%,很不错了。
今天关于如何在 Golang 中实现数据库事务的讲座就到此结束。希望你喜欢。
您可以在等待下一节课的同时尝试自己实现更新账户余额的功能。
祝您编码愉快,下篇文章再见!
如果您喜欢这篇文章,请订阅我们的 Youtube 频道并在 Twitter 上关注我们,以便将来获取更多教程。
如果你想加入我目前在 Voodoo 的优秀团队,请查看我们的职位空缺。你可以远程办公,也可以在巴黎/阿姆斯特丹/伦敦/柏林/巴塞罗那现场办公,但需获得签证担保。
文章来源:https://dev.to/techschoolguru/a-clean-way-to-implement-database-transaction-in-golang-2ba