Golang 中实现数据库事务的简洁方法

2025-06-07

Golang 中实现数据库事务的简洁方法

您好,欢迎回来!

在前面的课程中,我们学习了如何编写 golang 代码对简单银行数据库的每个单独的表执行 CRUD 操作。

但在实际应用中,我们经常需要执行一个事务,将来自多个表的某些操作组合起来。今天我们将学习一种在 Golang 中简洁地实现它的方法。

以下是:

数据库事务

在我们开始编码之前,让我们先讨论一下交易!

什么是 DB 事务?

嗯,基本上,它是一个单一的工作单元,通常由多个数据库操作组成。

什么是 db-tx

例如,在我们的简单银行中,我们想要10 USD从转账account 1account 2

tx-示例

该交易包含5个操作:

  1. 我们创建一个transfer金额等于10的记录。
  2. 我们创建了金额等于的entry记录,因为资金正在从该账户转出。account 1-10
  3. 我们entry为 创建另一条记录account 2,但金额等于10,因为资金正在转入该账户。
  4. 然后我们通过减去balance更新account 110
  5. 最后,我们通过添加来balance更新account 210

这就是我们在本文中要实现的交易。我们稍后会讲到。

为什么需要使用DB事务?

为什么-tx

主要有两个原因:

  1. 我们希望我们的工作单元可靠且一致,即使在系统出现故障的情况下。
  2. 我们希望在同时访问数据库的程序之间提供隔离。

ACID 属性

为了实现这两个目标,数据库事务必须满足ACID以下属性:

酸

  • AAtomicity,这意味着要么事务的所有操作都成功完成,要么整个事务失败,并且所有内容都回滚,数据库保持不变。
  • CConsistency,这意味着数据库状态在事务执行后应该保持有效。更准确地说,所有写入数据库的数据都必须根据预定义的规则(包括约束、级联和触发器)保持有效。
  • IIsolation,这意味着所有并发运行的事务不应相互影响。隔离级别有多种,用于定义一个事务所做的更改何时对其他事务可见。我们将在另一节课中详细讨论。
  • 最后一个属性是D,代表Durability。它的基本含义是,成功事务写入的所有数据必须保留在持久存储中,即使在系统发生故障的情况下也不会丢失。

如何运行 SQL DB 事务?

如何运行tx

这很简单:

  • 我们用该语句开始交易BEGIN
  • 然后我们编写一系列正常的 SQL 查询(或操作)。
  • 如果全部成功,我们COMMIT就将事务变为永久的,数据库将改变到一个新的状态。
  • 否则,如果任何查询失败,我们将ROLLBACK结束事务,因此该事务的先前查询所做的所有更改都将消失,并且数据库保持与事务之前的状态相同。

好了,现在我们对数据库事务有了一些基本的了解。让我们学习如何在 Golang 中实现它。

使用 Go 实现 DB 事务

我将store.go在该db/sqlc文件夹中创建一个新文件。在这个文件中,我们定义一个新的Store结构体。



type Store struct {
}


Enter fullscreen mode Exit fullscreen mode

该存储将提供单独运行数据库查询以及在事务内组合运行数据库查询的所有功能。

使用组合来扩展查询的功能

对于单独的查询,我们已经有了在以前的讲座中学习到Queries的结构。sqlc

但是,每个查询仅对一个特定表执行一个操作。因此Queries结构体不支持事务。因此,我们必须通过将其嵌入到Store结构体中来扩展其功能,如下所示:



type Store struct {
    *Queries
}


Enter fullscreen mode Exit fullscreen mode

这被称为composition,它是 Golang 中扩展结构体功能的首选方法,而不是inheritance。通过嵌入QueriesStore提供的所有单独查询函数都Queries将可供 使用Store

我们可以通过向这个新结构体添加更多函数来支持事务。为此,我们需要Store一个对象,sql.DB因为它是创建新数据库事务所必需的。



type Store struct {
    *Queries
    db *sql.DB
}


Enter fullscreen mode Exit fullscreen mode

创建新商店

好的,现在让我们添加一个函数来创建一个新Store对象。它将接受一个sql.DB作为输入,并返回一个Store。在函数内部,我们只需创建一个新Store对象并返回它即可。



func NewStore(db *sql.DB) *Store {
    return &Store{
        db:      db,
        Queries: New(db),
    }
}


Enter fullscreen mode Exit fullscreen mode

db是输入sql.DB,它Queries是通过调用New()带有该 db 对象的函数创建的。正如我们在之前的课程中已经了解的那样,该New()函数是由 生成的。它创建并返回一个对象。sqlcQueries

执行通用数据库事务

接下来,我们将添加一个函数来Store执行通用数据库事务。



func (store *Store) execTx(ctx context.Context, fn func(*Queries) error) error {
    ...
}


Enter fullscreen mode Exit fullscreen mode

这个想法很简单:它以上下文和回调函数作为输入,然后它将启动一个新的数据库事务,Queries使用该事务创建一个新对象,使用创建的回调函数调用回调函数Queries,最后根据该函数返回的错误提交或回滚事务。

让我们来实现它吧!

首先,为了开始一个新的事务,我们调用store.db.BeginTx(),传入上下文,以及可选的sql.TxOptions



tx, err := store.db.BeginTx(ctx, &sql.TxOptions{})


Enter fullscreen mode Exit fullscreen mode

isolation level此选项允许我们为此交易设置自定义。



type TxOptions struct {
    Isolation IsolationLevel
    ReadOnly  bool
}


Enter fullscreen mode Exit fullscreen mode

如果我们没有明确设置它,那么将使用数据库服务器的默认隔离级别,这是read-committedPostgres 的情况。

我们将在另一节课中详细讨论这一点。现在,我们先传递nil这里参数,使用默认值。



func (store *Store) execTx(ctx context.Context, fn func(*Queries) error) error {
    tx, err := store.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }

    q := New(tx)
    ...
}


Enter fullscreen mode Exit fullscreen mode

BeginTx()函数返回一个交易对象或一个错误。如果错误不为真nil,则立即返回。否则,我们将New()创建的交易作为参数调用该函数tx,并返回一个新Queries对象。

New()这和我们在函数中使用的函数是一样的。唯一的区别是,我们NewStore()传入的不是,而是对象。之所以能成功,是因为该函数接受一个接口,就像我们在上一节课中看到的那样:sql.DBsql.TxNew()DBTX



type DBTX interface {
    ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
    PrepareContext(context.Context, string) (*sql.Stmt, error)
    QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
    QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}

func New(db DBTX) *Queries {
    return &Queries{db: db}
}


Enter fullscreen mode Exit fullscreen mode

好的,现在我们有了在事务中运行的查询,我们可以使用该查询调用输入函数,并返回错误。

如果错误不是nil,则我们需要通过调用来回滚事务tx.Rollback()。它也会返回回滚错误。

如果回滚错误也不是nil,那么我们必须报告 2 个错误。因此,我们应该fmt.Errorf()在返回之前使用 将它们合并为 1 个错误:



func (store *Store) execTx(ctx context.Context, fn func(*Queries) error) error {
    tx, err := store.db.BeginTx(ctx, &sql.TxOptions)
    if err != nil {
        return err
    }

    q := New(tx)
    err = fn(q)
    if err != nil {
        if rbErr := tx.Rollback(); rbErr != nil {
            return fmt.Errorf("tx err: %v, rb err: %v", err, rbErr)
        }
        return err
    }

    return tx.Commit()
}


Enter fullscreen mode Exit fullscreen mode

如果回滚成功,我们只返回原始交易错误。

最后,如果事务中的所有操作都成功,我们只需使用提交事务tx.Commit(),并将其错误返回给调用者。

函数就完成了execTx()。请注意,此函数未导出(以小写字母 e 开头),因为我们不希望外部包直接调用它。因此,我们将为每个特定的交易提供一个导出函数。

实施汇款交易

现在让我们继续添加一个新TransferTx()功能来执行我们在视频开头讨论的汇款交易示例。

回想一下,它将创建一个新的转账记录,添加 2 个新的账户条目,并在单个数据库事务中更新 2 个账户的余额。

该函数的输入将是一个上下文和一个类型的参数对象TransferTxParams。它将返回一个TransferTxResult对象或一个错误。



func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
}


Enter fullscreen mode Exit fullscreen mode

TransferTxParams结构包含在两个账户之间转账所需的所有输入参数:



type TransferTxParams struct {
    FromAccountID int64 `json:"from_account_id"`
    ToAccountID   int64 `json:"to_account_id"`
    Amount        int64 `json:"amount"`
    }


Enter fullscreen mode Exit fullscreen mode
  • FromAccountID是汇款账户的 ID。
  • ToAccountID是将要汇款的账户的 ID。
  • 最后一个字段是Amount要发送的金额。

TransferTxResult结构体包含转账交易的结果。它有 5 个字段:



type TransferTxResult struct {
    Transfer    Transfer `json:"transfer"`
    FromAccount Account  `json:"from_account"`
    ToAccount   Account  `json:"to_account"`
    FromEntry   Entry    `json:"from_entry"`
    ToEntry     Entry    `json:"to_entry"`
}


Enter fullscreen mode Exit fullscreen mode
  • 已创建的Transfer记录。
  • FromAccount减去其余额之后
  • 之后ToAccount其余额就被添加了。
  • FromEntry记录资金流出的账户FromAccount
  • ToEntry记录资金转入的账户ToAccount

好了,现在我们可以实现转账交易了。首先,我们创建一个空的结果。然后,我们调用store.execTx()之前编写的函数来创建并运行一个新的数据库交易。



func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
    var result TransferTxResult

    err := store.execTx(ctx, func(q *Queries) error {
        ...
        return nil
    })

    return result, err
}


Enter fullscreen mode Exit fullscreen mode

我们传入上下文和回调函数。现在我们只需返回nil这个回调即可。最后,我们返回调用的结果和错误execTx()

现在让我们回过头来实现回调函数。基本上,我们可以使用查询对象q来调用它提供的任何单独的 CRUD 函数。

请记住,此查询对象是从 1 个数据库事务创建的,因此我们调用的所有提供的方法都将在该事务中运行。

好吧,让我们通过调用来创建转移记录q.CreateTransfer(),传入输入上下文和一个CreateTransferParams参数:



func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
    ...

    err := store.execTx(ctx, func(q *Queries) error {
        var err error

        result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
            FromAccountID: arg.FromAccountID,
            ToAccountID:   arg.ToAccountID,
            Amount:        arg.Amount,
        })
        if err != nil {
            return err
        }

        ...

        return nil
    })

    return result, err
}


Enter fullscreen mode Exit fullscreen mode

输出传输将被保存到result.Transfer或 错误。如果错误不是nil,我们会立即返回。

这里你可以看到,我们result从回调函数内部访问了外部函数的变量。arg变量也类似。

这使得回调函数变成了一个closure。由于 Go 缺乏对泛型类型的支持,当我们想要从回调函数中获取结果时,通常使用闭包,因为回调函数本身不知道它应该返回的结果的确切类型。

好的,创建转账记录的第一步已经完成。下一步是添加 2 个账户条目:1 个用于FromAccount,1 个用于ToAccount

我们调用q.CreateAccountEntry(),传入上下文和CreateAccountEntryParams,其中AccountIDarg.FromAccountID,并且Amount-arg.Amount因为资金正在从这个账户中转出。



func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
    ...

    err := store.execTx(ctx, func(q *Queries) error {
        var err error

        result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
            FromAccountID: arg.FromAccountID,
            ToAccountID:   arg.ToAccountID,
            Amount:        arg.Amount,
        })
        if err != nil {
            return err
        }

        result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
            AccountID: arg.FromAccountID,
            Amount:    -arg.Amount,
        })
        if err != nil {
            return err
        }

        ...

        return nil
    })

    return result, err
}


Enter fullscreen mode Exit fullscreen mode

和以前一样,如果错误不为零,我们就返回它,这样交易就会回滚。

我们用类似的方法为到账账户创建账户分录。但这次,它是result.ToEntryAccountIDarg.ToAccountID,并且Amount是,arg.Amount因为资金正在转入这个账户。



func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
    ...

    err := store.execTx(ctx, func(q *Queries) error {
        var err error

        result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
            FromAccountID: arg.FromAccountID,
            ToAccountID:   arg.ToAccountID,
            Amount:        arg.Amount,
        })
        if err != nil {
            return err
        }

        result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
            AccountID: arg.FromAccountID,
            Amount:    -arg.Amount,
        })
        if err != nil {
            return err
        }

        result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
            AccountID: arg.ToAccountID,
            Amount:    arg.Amount,
        })
        if err != nil {
            return err
        }

        return nil

        // TODO: update accounts' balance
    })

    return result, err
}


Enter fullscreen mode Exit fullscreen mode

我们已经完成了账户条目的创建。最后一步是更新账户余额,这会更加复杂,因为它涉及到locking如何防止潜在的deadlock……

所以我觉得值得用一堂课来详细讲解。我们先在这里添加一个 TODO 注释,下次再回来实现它。

测试汇款交易

现在假设我们的转账交易已完成,包含 1 条转账记录,并创建了 2 个账户条目。我们需要进行测试,以确保它能够按预期工作。

我要创建一个新store_test.go文件。它db和我们的在同一个包里store.go。然后我们为这个TransferTx()函数定义一个新的单元测试。



func TestTransferTx(t *testing.T) {
    ...
}


Enter fullscreen mode Exit fullscreen mode

首先我们需要创建一个新Store对象。该NewStore()函数需要一个sql.DB对象。

如果您还记得的话,在上一讲中,我们已经通过以下函数调用sql.DB在文件中创建了一个对象:main_test.gosql.Open()



func TestMain(m *testing.M) {
    conn, err := sql.Open(dbDriver, dbSource)
    ...
}


Enter fullscreen mode Exit fullscreen mode

因此为了重用它,这里我们不会将结果分配给连接变量,而是声明一个新的全局变量:testDB,并将命令的结果存储sql.Open()在其中。



var testQueries *Queries
var testDB *sql.DB

func TestMain(m *testing.M) {
    var err error
    testDB, err = sql.Open(dbDriver, dbSource)
    if err != nil {
        log.Fatal("cannot connect to db:", err)
    }

    testQueries = New(testDB)

    os.Exit(m.Run())
}


Enter fullscreen mode Exit fullscreen mode

好的,现在我们可以回到单元测试并将testDB对象传递到NewStore()函数中以创建一个新的Store



func TestTransferTx(t *testing.T) {
    store := NewStore(testDB)

    account1 := createRandomAccount(t)
    account2 := createRandomAccount(t)

    ...
}


Enter fullscreen mode Exit fullscreen mode

接下来,我们使用上一节课中编写的函数创建两个随机账户createRandomAccount()。我们将从账户 1 向账户 2 转账。

从我的经验来看,编写数据库事务是一件必须非常小心的事情。它可能很容易编写,但如果不仔细处理并发,也很容易变成一场噩梦。

因此,确保我们的交易运行良好的最佳方法是使用多个并发的 GO 例程来运行它。

假设我想运行n = 5并发转账交易,每个交易都会将一笔amount10从账户 1 转移到账户 2。因此,我将使用一个简单的 for 循环进行n迭代:



func TestTransferTx(t *testing.T) {
    store := NewStore(testDB)

    account1 := createRandomAccount(t)
    account2 := createRandomAccount(t)

    n := 5
    amount := int64(10)

    // run n concurrent transfer transaction
    for i := 0; i < n; i++ {
        go func() {
            result, err := store.TransferTx(context.Background(), TransferTxParams{
                FromAccountID: account1.ID,
                ToAccountID:   account2.ID,
                Amount:        amount,
            })

            ...
        }()
    }

    ...
}


Enter fullscreen mode Exit fullscreen mode

在循环内部,我们使用go关键字来启动一个新的例程。在 Go 例程内部,我们store.TransferTx()用一个背景上下文和一个TransferTxParams对象调用函数,其中FromAccountIDis account1.IDToAccountIDisaccount2.IDAmountis10就像我们amount = 10上面声明的那样。

此函数返回结果或错误。我们不能直接testify/require在这里检查它们,因为此函数运行在与我们函数所在的不同的 Go 例程中TestTransferTx,所以无法保证在条件不满足时它会停止整个测试。

验证错误和结果的正确方法是将它们发送回我们的测试正在运行的主 go 例程,然后从那里检查它们。

为此,我们可以使用通道。通道旨在连接并发的 Go 例程,并允许它们彼此安全地共享数据,而无需显式锁定。

在我们的例子中,我们需要一个通道来接收错误,以及另一个通道来接收 TransferTxResult。我们使用 make 关键字来创建通道。



func TestTransferTx(t *testing.T) {
    ...

    // run n concurrent transfer transaction
    errs := make(chan error)
    results := make(chan TransferTxResult)

    for i := 0; i < n; i++ {
        go func() {
            result, err := store.TransferTx(context.Background(), TransferTxParams{
                FromAccountID: account1.ID,
                ToAccountID:   account2.ID,
                Amount:        amount,
            })

            errs <- err
            results <- result
        }()
    }
}


Enter fullscreen mode Exit fullscreen mode

现在,在 goroutine 内部,我们可以使用这个箭头运算符err向通道发送数据。通道应该在箭头运算符的左侧,而要发送的数据应该在箭头运算符的右侧。errs<-

同样,我们发送resultresults通道。然后,我们将从外部检查这些错误和结果。

我们只需运行一个 for 循环即可n。为了从通道接收错误,我们使用相同的箭头运算符,但这次,通道位于箭头的右侧,而用于存储接收数据的变量位于左侧。



func TestTransferTx(t *testing.T) {
    ...

    // run n concurrent transfer transaction
    ...

    // check results
    for i := 0; i < n; i++ {
        err := <-errs
        require.NoError(t, err)

        result := <-results
        require.NotEmpty(t, result)

        ...
    }
}


Enter fullscreen mode Exit fullscreen mode

这里我们要求没有错误,这意味着接收到err的应该是nil。同样,我们resultresults通道接收并检查是否result为空对象。

由于result其中包含多个对象,让我们逐一验证一下。首先从以下内容开始result.Transfer



func TestTransferTx(t *testing.T) {
    ...

    // check results
    for i := 0; i < n; i++ {
        err := <-errs
        require.NoError(t, err)

        result := <-results
        require.NotEmpty(t, result)

        // check transfer
        transfer := result.Transfer
        require.NotEmpty(t, transfer)
        require.Equal(t, account1.ID, transfer.FromAccountID)
        require.Equal(t, account2.ID, transfer.ToAccountID)
        require.Equal(t, amount, transfer.Amount)
        require.NotZero(t, transfer.ID)
        require.NotZero(t, transfer.CreatedAt)

        ...
    }
}


Enter fullscreen mode Exit fullscreen mode

我们要求这个transfer对象不为空。那么FromAccountID的字段transfer应该等于account1.IDToAccountID的字段transfer应该等于account2.ID,并且transfer.Amount应该等于输入amount

字段不应该为零,因为它是一个自增字段。最后,它ID应该为零值,因为我们希望数据库填充默认值,即当前时间戳。transfertransfer.CreatedAt

现在为了确保数据库中确实创建了转账记录,我们应该调用来store.GetTransfer()查找 ID 等于的记录transfer.ID



func TestTransferTx(t *testing.T) {
    ...

    // check results
    for i := 0; i < n; i++ {
        // check transfer
        ...

        _, err = store.GetTransfer(context.Background(), transfer.ID)
        require.NoError(t, err)

        ...
    }
}



Enter fullscreen mode Exit fullscreen mode

这里可以看到,因为Queries对象是嵌入在里面的Store,所以GetTransfer()Query的功能对于Store也是可用的。

如果转移确实存在,这个函数就不应该返回错误,所以我们要求这里不出现错误。

接下来,我们将检查结果的账户分录。首先,FromEntry



func TestTransferTx(t *testing.T) {
    ...

    // check results
    for i := 0; i < n; i++ {
        // check transfer
        ...

        // check entries
        fromEntry := result.FromEntry
        require.NotEmpty(t, fromEntry)
        require.Equal(t, account1.ID, fromEntry.AccountID)
        require.Equal(t, -amount, fromEntry.Amount)
        require.NotZero(t, fromEntry.ID)
        require.NotZero(t, fromEntry.CreatedAt)

        _, err = store.GetEntry(context.Background(), fromEntry.ID)
        require.NoError(t, err)

        ...
    }
}


Enter fullscreen mode Exit fullscreen mode

和之前一样,我们检查它是否为空。账户余额ID应为,account1.IDAmount条目的 应等于,-amount因为有资金支出。最后,条目的IDcreated字段不应为零。

我们还尝试从数据库中获取条目以确保它确实已被创建。

检查条目的方法类似。因此,我只需复制此代码块,并将这些变量和字段名称更改为toEntry



func TestTransferTx(t *testing.T) {
    ...

    // check results
    for i := 0; i < n; i++ {
        // check transfer
        ...

        // check entries
        ...

        toEntry := result.ToEntry
        require.NotEmpty(t, toEntry)
        require.Equal(t, account2.ID, toEntry.AccountID)
        require.Equal(t, amount, toEntry.Amount)
        require.NotZero(t, toEntry.ID)
        require.NotZero(t, toEntry.CreatedAt)

        _, err = store.GetEntry(context.Background(), toEntry.ID)
        require.NoError(t, err)

        // TODO: check accounts' balance
    }
}


Enter fullscreen mode Exit fullscreen mode

账户ID应该是account2.ID而不是account1.IDAmount应该是amount而不是 ,-amount因为有钱进去。

最后,我们应该toEntry从数据库中获取记录,而不是fromEntry

现在请记住,我们还应该检查账户余额。但是由于我们尚未实现更新账户余额的部分,因此我们暂时在这里添加一个 TODO 注释,我们将在下一节课中完成它。

好的,现在测试已经准备好了,让我们尝试运行它。

测试通过

通过了!太棒了!

让我们运行整个包测试。

全部通过

全部通过!覆盖率大概80%,很不错了。

今天关于如何在 Golang 中实现数据库事务的讲座就到此结束。希望你喜欢。

您可以在等待下一节课的同时尝试自己实现更新账户余额的功能。

祝您编码愉快,下篇文章再见!


如果您喜欢这篇文章,请订阅我们的 Youtube 频道在 Twitter 上关注我们,以便将来获取更多教程。


如果你想加入我目前在 Voodoo 的优秀团队,请查看我们的职位空缺。你可以远程办公,也可以在巴黎/阿姆斯特丹/伦敦/柏林/巴塞罗那现场办公,但需获得签证担保。

文章来源:https://dev.to/techschoolguru/a-clean-way-to-implement-database-transaction-in-golang-2ba
PREV
A complete overview of SSL/TLS and its cryptographic system 1. What is SSL/TLS? 2. The history of SSL/TLS 3. Where is TLS being used? 4. Why do we need TLS? 5. How does TLS work? 6. Why TLS uses both symmetric and asymmetric cryptography? 7. Symmetric cryptography 8. Elliptic-Curve Cryptography 9. Asymmetric cryptography 10. TLS 1.3 handshake protocol
NEXT
开发人员的终极工具🛠