数据库事务锁以及如何处理死锁
锁定并不像你想象的那么简单!
在上一节课中,我们学习了如何实现一个简单的转账交易。然而,我们还没有更新账户余额,因为它比较复杂,需要谨慎处理并发事务以避免死锁。
因此,在本讲座中,我们将实现此功能,以了解有关数据库锁定以及如何处理死锁情况的更多信息。
以下是:
- YouTube 上完整系列播放列表的链接
- 以及它的Github 仓库
测试驱动开发
今天我将使用一种不同的实现方法,即测试驱动开发(TDD)。其思路是:我们先编写测试,让现有代码失效。然后逐步改进代码,直到测试通过。
好的,这是我们在上一个视频中进行的测试:
func TestTransferTx(t *testing.T) {
store := NewStore(testDB)
account1 := createRandomAccount(t)
account2 := createRandomAccount(t)
// run n concurrent transfer transactions
n := 5
amount := int64(10)
errs := make(chan error)
results := make(chan TransferTxResult)
for i := 0; i < n; i++ {
go func() {
result, err := store.TransferTx(context.Background(), TransferTxParams{
FromAccountID: account1.ID,
ToAccountID: account2.ID,
Amount: amount,
})
errs <- err
results <- result
}()
}
// check results
for i := 0; i < n; i++ {
err := <-errs
require.NoError(t, err)
result := <-results
require.NotEmpty(t, result)
// check transfer
transfer := result.Transfer
require.NotEmpty(t, transfer)
require.Equal(t, account1.ID, transfer.FromAccountID)
require.Equal(t, account2.ID, transfer.ToAccountID)
require.Equal(t, amount, transfer.Amount)
require.NotZero(t, transfer.ID)
require.NotZero(t, transfer.CreatedAt)
_, err = store.GetTransfer(context.Background(), transfer.ID)
require.NoError(t, err)
// check entries
fromEntry := result.FromEntry
require.NotEmpty(t, fromEntry)
require.Equal(t, account1.ID, fromEntry.AccountID)
require.Equal(t, -amount, fromEntry.Amount)
require.NotZero(t, fromEntry.ID)
require.NotZero(t, fromEntry.CreatedAt)
_, err = store.GetEntry(context.Background(), fromEntry.ID)
require.NoError(t, err)
toEntry := result.ToEntry
require.NotEmpty(t, toEntry)
require.Equal(t, account2.ID, toEntry.AccountID)
require.Equal(t, amount, toEntry.Amount)
require.NotZero(t, toEntry.ID)
require.NotZero(t, toEntry.CreatedAt)
_, err = store.GetEntry(context.Background(), toEntry.ID)
require.NoError(t, err)
// TODO: check accounts' balance
}
}
它创建了 5 个 goroutine 来执行 5 个并发的转账交易,每个交易都会将相同金额的钱从账户 1 转移到账户 2。然后它遍历结果列表来检查创建的转账和入账对象。
现在要完成这个测试,我们需要检查输出账户及其余额。
让我们从账户开始。首先是fromAccount
,资金支出的地方。我们检查它不应该为空。并且它ID
应该等于account1.ID
。
类似地toAccount
,资金流入其中。账户对象不应为空。并且其值ID
应等于account2.ID
。
func TestTransferTx(t *testing.T) {
...
// check results
for i := 0; i < n; i++ {
...
// check accounts
fromAccount := result.FromAccount
require.NotEmpty(t, fromAccount)
require.Equal(t, account1.ID, fromAccount.ID)
toAccount := result.ToAccount
require.NotEmpty(t, toAccount)
require.Equal(t, account2.ID, toAccount.ID)
// TODO: check accounts' balance
}
}
接下来我们将检查账户余额。我们计算diff1
输入account1.Balance
和输出之间的差额fromAccount.Balance
。这diff1
就是从账户1流出的金额。
diff2
类似地,我们计算输出toAccount.Balance
和输入之间的差额account2.Balance
。这diff2
就是进入账户2的金额。
func TestTransferTx(t *testing.T) {
...
// check results
for i := 0; i < n; i++ {
...
// check accounts' balance
diff1 := account1.Balance - fromAccount.Balance
diff2 := toAccount.Balance - account2.Balance
require.Equal(t, diff1, diff2)
require.True(t, diff1 > 0)
require.True(t, diff1%amount == 0) // 1 * amount, 2 * amount, 3 * amount, ..., n * amount
}
}
如果交易正常进行,那么diff1
和diff2
应该相同,并且它们应该是一个正数。
此外,这个差额应该能够被amount
每次交易中转移的资金量整除。原因是,账户1的余额在第一笔交易后会减少1倍金额,在第二笔交易后会减少2倍金额,在第三笔交易后会减少3倍金额,以此类推。
因此,如果我们计算k = diff1 / amount
,那么k
必须是1和之间的整数n
,其中n
是执行的交易数。
func TestTransferTx(t *testing.T) {
...
// check results
existed := make(map[int]bool)
for i := 0; i < n; i++ {
...
// check accounts' balance
...
k := int(diff1 / amount)
require.True(t, k >= 1 && k <= n)
require.NotContains(t, existed, k)
existed[k] = true
}
}
此外,k
对于每笔交易,必须是唯一的,这意味着k
第一笔交易应为 1,第二笔交易应为 2,第三笔交易应为 3,依此类推,直到k
等于n
。
为了检查这一点,我们需要声明一个名为 的新变量,existed
其类型为map[int]bool
。然后在循环中检查existed
map 是否包含k
。之后,我们将 设置existed[k]
为true
。
最后,在 for 循环之后,我们应该检查两个账户的最终更新余额。
store.GetAccount()
首先,我们通过调用后台上下文和账户 1 的 id从数据库中获取更新后的账户 1。ID
此查询不应该返回错误。我们以相同的方式从数据库中获取更新后的账户 2。
func TestTransferTx(t *testing.T) {
...
// check results
existed := make(map[int]bool)
for i := 0; i < n; i++ {
...
}
// check the final updated balance
updatedAccount1, err := store.GetAccount(context.Background(), account1.ID)
require.NoError(t, err)
updatedAccount2, err := store.GetAccount(context.Background(), account2.ID)
require.NoError(t, err)
require.Equal(t, account1.Balance-int64(n)*amount, updatedAccount1.Balance)
require.Equal(t, account2.Balance+int64(n)*amount, updatedAccount2.Balance)
}
现在n
交易后,账户 1 的余额必须减少n * amount
。所以我们要求updatedAccount1.Balance
等于该值。amount
是 类型,所以我们需要在进行乘法运算之前int64
将其转换n
为。int64
我们对 执行相同的操作updatedAccount2.Balance
,只不过它的值应该增加 i 而n * amount
不是减少。
就这样!测试完成了。但在运行之前,我要写一些日志,以便更清楚地查看结果。
首先,让我们打印出交易前账户的余额。然后在所有交易执行完成后,打印更新后的余额。我还想查看每笔交易后的结果余额,所以在 for 循环中添加一个日志。
好的,这是我们的最终测试:
func TestTransferTx(t *testing.T) {
store := NewStore(testDB)
account1 := createRandomAccount(t)
account2 := createRandomAccount(t)
fmt.Println(">> before:", account1.Balance, account2.Balance)
n := 5
amount := int64(10)
errs := make(chan error)
results := make(chan TransferTxResult)
// run n concurrent transfer transaction
for i := 0; i < n; i++ {
go func() {
result, err := store.TransferTx(context.Background(), TransferTxParams{
FromAccountID: account1.ID,
ToAccountID: account2.ID,
Amount: amount,
})
errs <- err
results <- result
}()
}
// check results
existed := make(map[int]bool)
for i := 0; i < n; i++ {
err := <-errs
require.NoError(t, err)
result := <-results
require.NotEmpty(t, result)
// check transfer
transfer := result.Transfer
require.NotEmpty(t, transfer)
require.Equal(t, account1.ID, transfer.FromAccountID)
require.Equal(t, account2.ID, transfer.ToAccountID)
require.Equal(t, amount, transfer.Amount)
require.NotZero(t, transfer.ID)
require.NotZero(t, transfer.CreatedAt)
_, err = store.GetTransfer(context.Background(), transfer.ID)
require.NoError(t, err)
// check entries
fromEntry := result.FromEntry
require.NotEmpty(t, fromEntry)
require.Equal(t, account1.ID, fromEntry.AccountID)
require.Equal(t, -amount, fromEntry.Amount)
require.NotZero(t, fromEntry.ID)
require.NotZero(t, fromEntry.CreatedAt)
_, err = store.GetEntry(context.Background(), fromEntry.ID)
require.NoError(t, err)
toEntry := result.ToEntry
require.NotEmpty(t, toEntry)
require.Equal(t, account2.ID, toEntry.AccountID)
require.Equal(t, amount, toEntry.Amount)
require.NotZero(t, toEntry.ID)
require.NotZero(t, toEntry.CreatedAt)
_, err = store.GetEntry(context.Background(), toEntry.ID)
require.NoError(t, err)
// check accounts
fromAccount := result.FromAccount
require.NotEmpty(t, fromAccount)
require.Equal(t, account1.ID, fromAccount.ID)
toAccount := result.ToAccount
require.NotEmpty(t, toAccount)
require.Equal(t, account2.ID, toAccount.ID)
// check balances
fmt.Println(">> tx:", fromAccount.Balance, toAccount.Balance)
diff1 := account1.Balance - fromAccount.Balance
diff2 := toAccount.Balance - account2.Balance
require.Equal(t, diff1, diff2)
require.True(t, diff1 > 0)
require.True(t, diff1%amount == 0) // 1 * amount, 2 * amount, 3 * amount, ..., n * amount
k := int(diff1 / amount)
require.True(t, k >= 1 && k <= n)
require.NotContains(t, existed, k)
existed[k] = true
}
// check the final updated balance
updatedAccount1, err := store.GetAccount(context.Background(), account1.ID)
require.NoError(t, err)
updatedAccount2, err := store.GetAccount(context.Background(), account2.ID)
require.NoError(t, err)
fmt.Println(">> after:", updatedAccount1.Balance, updatedAccount2.Balance)
require.Equal(t, account1.Balance-int64(n)*amount, updatedAccount1.Balance)
require.Equal(t, account2.Balance+int64(n)*amount, updatedAccount2.Balance)
}
让我们运行它吧!
在第 行失败了83
,我们预期fromAccount
不为空。但目前它当然是空的,因为我们还没有实现这个功能。
那么让我们回到store.go
文件来实现它吧!
更新账户余额(错误方式)
更改帐户余额的一个简单直观的方法是首先从数据库中获取该帐户,然后在其余额中添加或减去一定金额,然后将其更新回数据库。
然而,如果没有合适的锁定机制,这通常会出错。我来教你怎么做!
首先,我们调用q.GetAccount()
来获取fromAccount
记录并将其赋值给account1
变量。如果err
不是nil
,则返回它。
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
...
// move money out of account1
account1, err := q.GetAccount(ctx, arg.FromAccountID)
if err != nil {
return err
}
result.FromAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
ID: arg.FromAccountID,
Balance: account1.Balance - arg.Amount,
})
if err != nil {
return err
}
}
return result, err
}
否则,我们调用q.UpdateAccount()
来更新此账户的余额。账户 ID 应该是arg.FromAccountID
,余额将更改为 ,account1.Balance - arg.Amount
因为有资金从此账户转出。
更新后的账户记录将保存到result.FromAccount
。如果出现错误,则直接返回。
在此之后,我们已将资金转出fromAccount
。现在我们可以做类似的事情,将这些资金转入toAccount
。
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
...
// move money out of account1
...
// move money into account2
account2, err := q.GetAccount(ctx, arg.ToAccountID)
if err != nil {
return err
}
result.ToAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
ID: arg.ToAccountID,
Balance: account2.Balance + arg.Amount,
})
if err != nil {
return err
}
}
return result, err
}
这里,账户 ID 应该是arg.ToAccountID
。结果将存储在 中result.ToAccount
。新的余额应该是account2.Balance + arg.Amount
,因为资金将转入此账户。
好的,实现完成了。但是,我要告诉你,这是错误的。让我们重新运行测试看看结果如何!
测试仍然失败。但这次错误出现在第 94 行,我们在此比较了账户 1 的支出金额和账户 2 的入账金额。
在日志中,我们可以看到第一笔交易是正确的。账户 1 的余额减少了10
,从380
到370
。而账户 2 的余额增加了相同的金额,从390
到400
。
但它在第二个交易中无法正常工作。账户 2 的余额增加了10
更多,达到410
。而账户 1 的余额保持不变,为370
。
为了理解原因,让我们看一下GetAccount
查询:
-- name: GetAccount :one
SELECT * FROM accounts
WHERE id = $1 LIMIT 1;
这只是一个正常的SELECT
,所以它不会阻止其他事务读取相同的Account
记录。
因此,2 个并发交易可以获取账户 1 的相同值,原始余额为380
。这也解释了为什么它们370
执行后,余额都更新为 。
无锁查询
为了演示这种情况,让我们psql
在 2 个不同的终端选项卡中启动控制台并运行 2 个并行事务。
在第一个事务中,让我们运行一个正常SELECT
查询来获取帐户记录ID = 1
。
SELECT * FROM accounts WHERE id = 1;
该账户余额为748美元。
现在我将在另一个事务中运行此查询。
如您所见,相同的账户记录立即返回,且未被阻止。这不是我们想要的。因此,让我们回滚这两个事务,并学习如何修复它。
带锁查询
我将开始 2 个新事务。但这次,我们将FOR UPDATE
在语句末尾添加子句SELECT
。
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
现在第一个事务仍然立即获取记录。但是当我们在第二个事务上运行此查询时:
它被阻塞并且必须等待第一个事务 COMMIT 或 ROLLBACK。
让我们回到该交易并将账户余额更新为 500:
UPDATE accounts SET balance = 500 WHERE id = 1;
更新之后,第二个事务仍然被阻塞。但是,一旦我们提交第一个事务:
我们可以看到第二笔交易立即解锁,并获取了最新更新的账户余额 500 欧元。这正是我们想要实现的!
锁定更新账户余额
让我们回到account.sql
文件,并添加一个新查询以获取要更新的帐户:
-- name: GetAccountForUpdate :one
SELECT * FROM accounts
WHERE id = $1 LIMIT 1
FOR UPDATE;
然后我们打开终端并运行make sqlc
重新生成代码。现在在account.sql.go
文件中,GetAccountForUpdate()
生成了一个新的函数。
const getAccountForUpdate = `-- name: GetAccountForUpdate :one
SELECT id, owner, balance, currency, created_at FROM accounts
WHERE id = $1 LIMIT 1
FOR UPDATE
`
func (q *Queries) GetAccountForUpdate(ctx context.Context, id int64) (Account, error) {
row := q.db.QueryRowContext(ctx, getAccountForUpdate, id)
var i Account
err := row.Scan(
&i.ID,
&i.Owner,
&i.Balance,
&i.Currency,
&i.CreatedAt,
)
return i, err
}
我们可以在转账交易中使用它。在这里,为了获取第一个账户,我们调用q.GetAccountForUpdate()
而不是q.GetAccount()
。我们执行相同的操作来获取第二个账户。
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
...
// move money out of account1
account1, err := q.GetAccountForUpdate(ctx, arg.FromAccountID)
if err != nil {
return err
}
result.FromAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
ID: arg.FromAccountID,
Balance: account1.Balance - arg.Amount,
})
if err != nil {
return err
}
// move money into account2
account2, err := q.GetAccountForUpdate(ctx, arg.ToAccountID)
if err != nil {
return err
}
result.ToAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
ID: arg.ToAccountID,
Balance: account2.Balance + arg.Amount,
})
if err != nil {
return err
}
}
return result, err
}
好了,现在我们期望它能正常工作。让我们重新运行测试。
不幸的是,它仍然失败了。这次的错误是deadlock detected
。那么我们该怎么办呢?
别担心!我会教你如何调试这种死锁情况。
调试死锁
为了弄清楚为什么会发生死锁,我们需要打印出一些日志来查看哪个事务正在调用哪个查询以及调用顺序。
为此,我们必须为每个交易分配一个名称,并TransferTx()
通过上下文参数将其传递给函数。
现在,在这个测试的 for 循环中,我将创建一个txName
变量来存储交易的名称。我们使用fmt.Sprintf()
函数和计数器i
来创建不同的名称:tx 1
、tx 2
、tx 3
等等。
然后在 go routine 内部,我们不会传递后台上下文,而是传递一个带有事务名称的新上下文。
func TestTransferTx(t *testing.T) {
...
// run n concurrent transfer transaction
for i := 0; i < n; i++ {
txName := fmt.Sprintf("tx %d", i+1)
go func() {
ctx := context.WithValue(context.Background(), txKey, txName)
result, err := store.TransferTx(ctx, TransferTxParams{
FromAccountID: account1.ID,
ToAccountID: account2.ID,
Amount: amount,
})
errs <- err
results <- result
}()
}
// check results
...
}
为了将事务名称添加到上下文中,我们调用context.WithValue()
,传入后台上下文作为其父级,以及一对键值,其中值是事务名称。
文档中提到,上下文键不应为字符串类型或任何内置类型,以避免包之间发生冲突。通常情况下,我们应该struct{}
为上下文键定义一个类型变量。
所以我要在文件txKey
中添加一个新变量store.go
,因为稍后我们必须使用这个键从TransferTx()
函数的输入上下文中获取交易名称。
var txKey = struct{}{}
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
...
}
...
这里,第二个括号struct{}{}
表示我们正在创建一个新的类型空对象struct{}
。
现在,在TransferTx()
函数中,上下文将保存交易名称。我们可以通过调用 来从上下文中ctx.Value()
获取值。txKey
现在我们有了事务名称,可以用它写一些日志了。让我们打印出这个事务名称和第一个操作:create transfer
。然后对其余操作执行相同的操作:
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
var err error
txName := ctx.Value(txKey)
fmt.Println(txName, "create transfer")
result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountID,
ToAccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
fmt.Println(txName, "create entry 1")
result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}
fmt.Println(txName, "create entry 2")
result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
// move money out of account1
fmt.Println(txName, "get account 1")
account1, err := q.GetAccountForUpdate(ctx, arg.FromAccountID)
if err != nil {
return err
}
fmt.Println(txName, "update account 1")
result.FromAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
ID: arg.FromAccountID,
Balance: account1.Balance - arg.Amount,
})
if err != nil {
return err
}
// move money into account2
fmt.Println(txName, "get account 2")
account2, err := q.GetAccountForUpdate(ctx, arg.ToAccountID)
if err != nil {
return err
}
fmt.Println(txName, "update account 2")
result.ToAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
ID: arg.ToAccountID,
Balance: account2.Balance + arg.Amount,
})
if err != nil {
return err
}
})
return result, err
}
好的,现在日志已添加,我们可以重新运行测试以查看进展情况。
但为了更容易调试,我们不应该运行太多并发事务。所以我将把它改为n
而2
不是5
。
func TestTransferTx(t *testing.T) {
...
n := 2
amount := int64(10)
errs := make(chan error)
results := make(chan TransferTxResult)
// run n concurrent transfer transaction
...
}
那么我们来运行测试吧!
瞧,僵局依然存在。不过这次,我们有了详细的日志记录。
正如您在这里看到的:
- 事务 2 运行了它的前两个操作:
create transfer
和create entry 1
。 - 然后事务 1 开始运行其
create transfer
操作。 - 事务 2 返回并继续运行接下来的 2 个操作:
create entry 2
和get account 1
。 - 最后,事务 1 轮到并运行接下来的 4 个操作:
create entry 1
,,,和。create entry 2
get account 1
update account 1
- 此时,我们陷入了僵局。
现在我们确切地知道了发生了什么。我们要做的是找出事情发生的原因。
在 psql 控制台中复制死锁
这里我在 TablePlus 中打开了simple_bank
数据库。目前,它有两个账户,原始余额相同,均为100 USD
。
我还准备了汇款交易,其中包含 SQL 查询列表,这些查询应按照我们在 Golang 代码中实现的方式运行:
BEGIN;
SELECT * FROM accounts WHERE id = 1;
INSERT INTO transfers (from_account_id, to_account_id, amount) VALUES (1, 2, 10) RETURNING *;
INSERT INTO entries (account_id, amount) VALUES (1, -10) RETURNING *;
INSERT INTO entries (account_id, amount) VALUES (2, 10) RETURNING *;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
UPDATE accounts SET balance = 90 WHERE id = 1 RETURNING *;
SELECT * FROM accounts WHERE id = 2 FOR UPDATE;
UPDATE accounts SET balance = 110 WHERE id = 2 RETURNING *;
ROLLBACK;
- 交易从
BEGIN
声明开始。 - 首先,我们来记录
INSERT
一下transfer
从account 1
到account 2
的新amount
情况10
。 - 然后我们创下了
INSERT
新的entry
纪录。account 1
amount
-10
- 另
INSERT
一项entry
记录account 2
为。amount
+10
- 接下来我们
SELECT
account 1
进行更新。 - 我们
UPDATE
将其balance
设为100-10
,即90
美元。 - 同样,我们
SELECT
account 2
进行更新。 - 我们将
UPDATE
其余额设为100+10
,相当于110
美元。 ROLLBACK
最后,当发生死锁时我们会执行该操作。
现在就像我们之前所做的那样,我将打开终端并运行 2 个 psql 控制台以并行执行 2 个事务。
让我们用 开始第一个事务BEGIN
。然后打开另一个选项卡并访问 psql 控制台。用 开始第二个事务BEGIN
。
现在,我们应该按照日志中的步骤操作。首先,transaction 2
应该运行它的前两个查询来创建transfer
和entry 1
记录:
插入成功!现在我们必须转到transaction 1
并运行第一个查询来创建transfer
记录。
现在返回transaction 2
并运行其第 3 个查询来创建entry 2
,以及第 4 个查询来获取account 1
更新。
现在我们看到这个查询被阻塞了。它正在等待transaction 2
提交或回滚后才能继续。
这听起来很奇怪,因为事务 2 只在表中创建了一条记录,transfers
而我们从accounts
表中获取了一条记录。为什么INSERT
事务 1 中的记录会阻塞SELECT
事务 1 中的记录呢?
为了确认这一点,让我们打开这个关于锁监控的Postgres Wiki 页面。
SELECT blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement,
blocking_activity.query AS current_statement_in_blocking_process
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;
这个长而复杂的查询使我们能够查找被阻止的查询以及阻止它们的原因。因此,让我们将其复制并在 TablePlus 中运行。
如您所见,被阻止的语句是SELECT FROM accounts FOR UPDATE
。 阻止它的语句是INSERT INTO transfers
。 因此,这两个不同表上的查询确实可以相互阻塞。
让我们深入挖掘一下为什么SELECT
查询必须等待查询INSERT
。
如果我们回到Postgres Wiki并向下滚动一点,我们将看到另一个查询,它允许我们列出数据库中的所有锁。
我将稍微修改这个查询,因为我想查看更多信息:
SELECT
a.datname,
a.application_name,
l.relation::regclass,
l.transactionid,
l.mode,
l.locktype,
l.GRANTED,
a.usename,
a.query,
a.pid
FROM
pg_stat_activity a
JOIN pg_locks l ON
l.pid = a.pid
ORDER BY
a.pid;
- 该
a.datname
字段将显示数据库名称。 - 我们来添加一下
a.application_name
,看看锁来自哪个应用程序。 - regclass
l.relation
实际上是表的名称, L.transactionid
是锁所属事务的ID。L.mod
是锁的模型。- 我们还要添加一下
l.lock_type
以查看锁的类型。 L.granted
告诉我们锁是否被授予。a.usename
是运行查询的用户名。a.query
是持有或尝试获取锁的查询。- 该查询开始的时间
a.query_start
或其时间age
不是很重要,所以我将删除它们。 - 最后一个字段是
a.pid
,它是运行事务的进程 ID。
如您所见,我们从pg_state_activity
表中选择别名为,并在进程 ID 列上与表别名为a
连接。pg_locks
l
这是按查询开始时间排序的,但实际上我认为按进程ID排序更好,因为我们有两个不同的进程,它们分别运行着两个psql控制台,并执行着两个并行事务。这样更容易看出哪个锁属于哪个事务。
好的,让我们运行它吧!
这里我们可以看到一些来自TablePlus
应用程序的锁,这些锁与我们的期望无关。我们关心的只是来自psql
控制台的锁。
因此我要添加一个 WHERE 子句来仅获取应用程序名称等于的锁psql
。
数据库名称也不重要,因为simple_bank
在我们的例子中它总是存在的。所以我也会删除a.datname
它。
好的,让我们再次运行这个查询:
SELECT
a.application_name,
l.relation::regclass,
l.transactionid,
l.mode,
l.locktype,
l.GRANTED,
a.usename,
a.query,
a.pid
FROM
pg_stat_activity a
JOIN pg_locks l ON
l.pid = a.pid
WHERE
a.application_name = 'psql'
ORDER BY
a.pid;
现在我们可以看到,只有 1 个锁尚未被授予。它来自对SELECT FROM accounts
进程 ID 的查询3053
。
未授予的原因是,它试图获取ShareLock
类型为 的锁transactionid
,其中事务 ID 为2442
。而此事务 ID 锁正由执行查询的exclusively
另一个进程 ID持有。3047
INSERT INTO transfers
但是为什么一个SELECT FROM accounts
表需要从运行该表的其他事务中获取锁呢INSERT INTO transfers
?
好吧,如果我们看一下数据库模式,我们可以看到帐户和转账表之间的唯一联系是外键约束:
ALTER TABLE "entries" ADD FOREIGN KEY ("account_id") REFERENCES "accounts" ("id");
ALTER TABLE "transfers" ADD FOREIGN KEY ("from_account_id") REFERENCES "accounts" ("id");
ALTER TABLE "transfers" ADD FOREIGN KEY ("to_account_id") REFERENCES "accounts" ("id");
表的from_account_id
和to_account_id
列transfers
引用表id
的 列accounts
。因此,UPDATE
账户 ID 上的任何内容都会影响此外键约束。
这就是为什么当我们选择一个账户进行更新时,它需要获取锁以防止冲突并确保数据的一致性。
话虽如此,现在如果我们继续在事务 1 上运行其余查询来创建entry 1
、创建entry 2
和选择account 1
更新:
我们将会遇到死锁,因为这个查询还必须等待来自事务 2 的锁,而事务 2 也在等待来自事务 1 的锁。
这清楚地解释了死锁是如何发生的。但是该如何解决呢?
修复死锁(不好的方法)
我们知道,死锁是由外键约束引起的,因此避免死锁的一个简单方法就是删除这些约束。
让我们尝试注释掉init_schema.up.sql
文件中的这些语句:
-- ALTER TABLE "entries" ADD FOREIGN KEY ("account_id") REFERENCES "accounts" ("id");
-- ALTER TABLE "transfers" ADD FOREIGN KEY ("from_account_id") REFERENCES "accounts" ("id");
-- ALTER TABLE "transfers" ADD FOREIGN KEY ("to_account_id") REFERENCES "accounts" ("id");
make migratedown
然后在终端中运行以删除数据库模式。然后运行make migrateup
以重新创建没有外键约束的新数据库模式。
好了,现在我们再次运行测试,它会通过,因为约束已经消失,所以在选择账户进行更新时不需要加锁。没有锁就意味着没有死锁。
然而,这不是最好的解决方案,因为我们不想失去保持数据一致性的良好约束。
因此,让我们恢复这些更改,运行make migratedown
,然后make migrateup
再次运行以恢复这些约束。现在测试将再次因死锁而失败。
让我们学习一个更好的方法来解决这个问题。
修复死锁[更好的方法]
我们已经知道,只需要事务锁,因为 Postgres 担心事务 1 将更新account ID
,这会影响表的外键约束transfers
。
然而,如果我们查看UpdateAccount
查询,我们就会发现它只会改变账户余额。
-- name: UpdateAccount :one
UPDATE accounts
SET balance = $2
WHERE id = $1
RETURNING *;
帐户 ID 永远不会改变,因为它是帐户表的主键。
因此,如果我们可以告诉 Postgres 我正在选择此帐户进行更新,但不会触及其主键,那么 Postgres 将不需要获取事务锁,因此不会出现死锁。
幸运的是,这非常容易做到。在GetAccountForUpdate
查询中,SELECT FOR UPDATE
我们只需要更清楚地说明即可:SELECT FOR NO KEY UPDATE
-- name: GetAccountForUpdate :one
SELECT * FROM accounts
WHERE id = $1 LIMIT 1
FOR NO KEY UPDATE;
这将告诉 Postgres 我们不更新ID
帐户表的键或列。
现在我们make sqlc
在终端中运行来重新生成这个查询的 golang 代码。
const getAccountForUpdate = `-- name: GetAccountForUpdate :one
SELECT id, owner, balance, currency, created_at FROM accounts
WHERE id = $1 LIMIT 1
FOR NO KEY UPDATE
`
func (q *Queries) GetAccountForUpdate(ctx context.Context, id int64) (Account, error) {
row := q.db.QueryRowContext(ctx, getAccountForUpdate, id)
var i Account
err := row.Scan(
&i.ID,
&i.Owner,
&i.Balance,
&i.Currency,
&i.CreatedAt,
)
return i, err
}
好的,代码已更新。让我们再次运行测试!
成功了!太棒了!我们的调试和修复工作完成了。
更新账户余额[更好的方法]
现在,在我们结束之前,我将向您展示一种更好的方法来实现此更新帐户余额操作。
目前,我们必须执行 2 个查询来获取帐户并更新其余额:
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
...
// move money out of account1
account1, err := q.GetAccountForUpdate(ctx, arg.FromAccountID)
if err != nil {
return err
}
result.FromAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
ID: arg.FromAccountID,
Balance: account1.Balance - arg.Amount,
})
if err != nil {
return err
}
// move money into account2
...
})
return result, err
}
我们可以通过仅使用 1 个查询直接向账户余额中添加一定数量的钱来改进这一点。
为此,我将向文件添加一个名为“SQL 查询”的新AddAccountBalance
查询query/account.sql
。
-- name: AddAccountBalance :one
UPDATE accounts
SET balance = balance + $1
WHERE id = $2
RETURNING *;
它与 UpdateAccount 查询类似,不同之处在于,我们在这里设置了balance = balance + $2
。
运行make sqlc
生成代码,可以看到一个新函数成功添加到了Queries
结构体中:
const addAccountBalance = `-- name: AddAccountBalance :one
UPDATE accounts
SET balance = balance + $1
WHERE id = $2
RETURNING id, owner, balance, currency, created_at
`
type AddAccountBalanceParams struct {
Balance int64 `json:"balance"`
ID int64 `json:"id"`
}
func (q *Queries) AddAccountBalance(ctx context.Context, arg AddAccountBalanceParams) (Account, error) {
row := q.db.QueryRowContext(ctx, addAccountBalance, arg.Balance, arg.ID)
var i Account
err := row.Scan(
&i.ID,
&i.Owner,
&i.Balance,
&i.Currency,
&i.CreatedAt,
)
return i, err
}
然而,结构balance
体中的参数AddAccountBalanceParams
看起来有点令人困惑,因为我们只是向余额中添加一些金额,而不是将账户余额更改为这个值。
所以这个参数的名称应该是Amount
。我们能让 sqlc 帮我们做这件事吗?
是的,可以!在 SQL 查询中,$2
我们可以用 来代替sqlc.arg(amount)
,而$1
应该用 来代替 。sqlc.arg(id)
-- name: AddAccountBalance :one
UPDATE accounts
SET balance = balance + sqlc.arg(amount)
WHERE id = sqlc.arg(id)
RETURNING *;
这amount
将id
成为生成的参数的名称。让我们在终端中运行 make sqlc 来重新生成代码。
const addAccountBalance = `-- name: AddAccountBalance :one
UPDATE accounts
SET balance = balance + $1
WHERE id = $2
RETURNING id, owner, balance, currency, created_at
`
type AddAccountBalanceParams struct {
Amount int64 `json:"amount"`
ID int64 `json:"id"`
}
func (q *Queries) AddAccountBalance(ctx context.Context, arg AddAccountBalanceParams) (Account, error) {
row := q.db.QueryRowContext(ctx, addAccountBalance, arg.Amount, arg.ID)
var i Account
err := row.Scan(
&i.ID,
&i.Owner,
&i.Balance,
&i.Currency,
&i.CreatedAt,
)
return i, err
}
这次,我们可以看到参数的名称已经变成了我们想要的名称。太棒了!
现在回到store.go
文件,我将删除该GetAccountForUpdate
调用,然后更改UpdateAccount()
为AddAccountBalance()
:
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
...
// move money out of account1
result.FromAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}
// move money into account2
result.ToAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
return nil
})
return result, err
}
请注意,Amount
添加到account1
应该是-amount
因为资金正在流出。
完成了!让我们重新运行测试。
耶!通过了!我们来运行整个包测试。
全部通过了!
今天关于数据库事务中的锁定以及如何调试死锁的讲座就到这里。希望你喜欢。
请继续关注下一讲,因为我会告诉你,死锁问题尚未完全解决。还有很多内容需要学习。
同时,祝您编码愉快,我们很快就会再见面!
如果您喜欢这篇文章,请订阅我们的 Youtube 频道并在 Twitter 上关注我们,以便将来获取更多教程。
如果你想加入我目前在 Voodoo 的优秀团队,请查看我们的职位空缺。你可以远程办公,也可以在巴黎/阿姆斯特丹/伦敦/柏林/巴塞罗那现场办公,但需获得签证担保。
文章来源:https://dev.to/techschoolguru/db-transaction-lock-how-to-handle-deadlock-22o8