新功能:无服务器框架中的 DynamoDB 流过滤
通过本文,您将了解如何利用 DynamoDB 和 Lambda 最新发布的 Streams Filtering 功能。
我们将深入讲解 DynamoDB 事件操作过滤的基本示例。您将学习如何将其与您的业务逻辑相结合。我将使用 DynamoDB单表设计设置来实现这一点。
什么是新的?
如果您还没有听说过,就在#reInvent2021 之前,AWS 发布了这个重大更新。
有什么变化?
更新前
在 DynamoDB 表(INSERT
、MODIFY
、REMOVE
)中执行的每个操作都会触发一个事件,该事件通过 DynamoDB 流发送到 Lambda 函数。无论操作类型如何,始终都会调用 Lambda 函数。这会产生两个影响:
if
在执行业务逻辑之前,您必须在 Lambda 代码(条件)中实现过滤逻辑(即,每当表中添加INSERT
新内容时,过滤操作都会发送欢迎电子邮件)。User
- 您需要为每次 Lambda 运行付费,尽管在大多数情况下您只对某些事件感兴趣。
这种情况在单表设计中成倍增加,在单表中存储多种类型,因此实际上您有许多INSERT
具有子类型的(即新用户、新地址、新订单等)
更新后
现在,您可以过滤掉与业务逻辑无关的事件。通过定义过滤条件,您可以控制哪些事件可以调用 Lambda 函数。过滤会根据消息中的值来评估事件。
这解决了上述问题:
if
逻辑评估已推送至 AWS( Lambda 代码中不再有s)- 不再需要执行不必要的 Lambda。
这一切都归功于定义过滤条件的小 JSON 片段。
重构流过滤
既然您正在阅读这篇文章,那么可以肯定地说,您和我一样,已经在使用 DynamoDB Streams 来调用您的 Lambda 函数。
因此,让我带你了解一下重构过程。这是我在生产环境中使用的代码的简化版本。
在我的 DynamoDB 表中,我存储了两种类型的实体:Order
和Invoice
。我的业务逻辑要求我仅在被修改时执行某些操作。 如您所见,它只是六种情况中的一种。想象一下,如果您的表中有更多类型,并且您的业务逻辑也要求您执行其他操作,会发生什么情况。Invoice
旧事件过滤
让我们从更新之前那些丑陋的语句开始,if
因为我必须手动过滤事件。
我的 Lambda 处理程序从执行parseEvent
方法开始:
const parseEvent = (event) => {
const e = event.Records[0] // batch size = 1
const isInsert = e.eventName === 'INSERT'
const isModify = e.eventName === 'MODIFY'
const isOrder = e.dynamodb.NewImage?.Type?.S === 'Order'
const isInvoice = e.dynamodb.NewImage?.Type?.S === 'Invoice'
const newItemData = e.dynamodb.NewImage
const oldItemData = e.dynamodb.OldImage
return {
isInsert, isModify, isOrder, isInvoice, newItemData, oldItemData
}
}
下一步,我必须评估处理程序中的条件:
const {
isInsert, isModify, isOrder, isInvoice, newItemData, oldItemData
} = parseEvent(event)
if (isModify && isInvoice) {
// perform business logic
// uses newItemData & oldItemData values
}
新事件过滤
通过在 AWS 上推动条件评估,新功能使我们能够显著简化该代码。
简单回顾一下,我的业务逻辑要求我仅允许在实体MODIFY
上执行的事件Invoice
。幸运的是,我Type
在 DynamoDB 表中保存了实体的值(感谢 Alex 🤝)。
DynamoDB 事件结构定义明确,因此基本上我需要做的是确保:
eventName
等于MODIFY
,并且dynamodb.NewImage.Type.S
等于Invoice
。
所有这些都在 Lambda 配置部分中定义filterPatterns
。以下是 Serverless Framework 配置文件中的一段代码serverless.yml
。对 的支持filterPatterns
是在 2.68.0 版本中引入的——请确保您使用的是该版本或更高版本。
functionName:
handler: src/functionName/function.handler
# other properties
events:
- stream:
type: dynamodb
arn: !GetAtt DynamoDbTable.StreamArn
maximumRetryAttempts: 1
batchSize: 1
filterPatterns:
- eventName: [MODIFY]
dynamodb:
NewImage:
Type:
S: [Invoice]
这就是过滤 DynamoDB 流所需要做的全部工作。
很神奇,不是吗?
陷阱
请记住,单个源上可以有多个过滤器。在这种情况下,每个过滤器都独立工作。简而言之,它们之间OR
没有AND
逻辑。
我错误地创建了两个过滤器,从而学到了这一点:
filterPatterns:
- eventName: [MODIFY]
- dynamodb:
NewImage:
Type:
S: [Invoice]
-
在 前面添加dynamodb:
。这导致了错误的过滤器:
{
"filters": [
{
"pattern": "{\"eventName\":[\"MODIFY\"]}"
},
{
"pattern": "{\"dynamodb\":{\"NewImage\":{\"Type\":{\"S\":[\"Invoice\"]}}}}"
}
]
}
那个可以捕获所有动作或对象中包含MODIFY
的任何内容,因此 DynamoDB动作也是如此!Invoice
Type
NewImage
INSERT
正确的过滤器:
{
"filters": [
{
"pattern": "{\"eventName\":[\"MODIFY\"],\"dynamodb\":{\"NewImage\":{\"Type\":{\"S\":[\"Invoice\"]}}}}"
}
]
}
您可以在 Lambda 控制台的配置->触发器部分下查看过滤器。
全局表
正如kolektiv在下面的评论中提到的,此功能不适用于全局表。
还有一个问题,您无法对全局表使用过滤功能,否则您的过滤器将不会被执行,您的函数也不会被调用。已与 AWS 支持确认。
感谢您指出这一点。
它要多少钱?
没有什么。
目前尚无任何关于额外定价的信息。此外,Jeremy Daly 在 2021 年 re:Invent 大会上也证实了这一点。
实际上,此功能可以节省您的维护费用,因为编写和调试 Lambda 代码更加容易,并且操作也更加简单,因为函数仅在响应业务相关事件时执行。
低耦合
在更新之前,人们在单个 Lambda 函数中实现了事件过滤逻辑。因此,他们面临着高耦合的问题(除非他们使用了某种调度程序模式)。
现在,我们可以将多个独立的 Lambda 函数(每个函数都有各自的筛选条件)附加到同一个 DynamoDB 流。这样可以降低处理不同事件类型的代码之间的耦合度。所有单表设计实践者都会非常感激这一点。
更新
我忘了说了,除了在过滤器中评估字符串相等条件之外,你还能做更多的事情。通过几个比较运算符,你还可以实现更多的可能性。
这是一张表格
被盗摘自AWS Docs(如果不能包含在这里,请告诉我。):
比较运算符 | 例子 | 规则语法 |
---|---|---|
无效的 | UserID 为空 | "用户 ID": [ 空 ] |
空的 | 姓氏为空 | “姓”: [””] |
等于 | 名字是“爱丽丝” | "姓名": [ "爱丽丝" ] |
和 | 地点为“纽约”,日期为“星期一” | "地点": [ "纽约" ], "日期": ["星期一"] |
或者 | PaymentType 为“信用卡”或“借记卡” | "PaymentType": [ "信用卡", "借记卡"] |
不是 | 天气绝不是“下雨” | "天气": [ { "anything-but": [ "下雨" ] } ] |
数字(等于) | 价格是100 | "价格": [ { "数字": [ "=", 100 ] } ] |
数字(范围) | Price 大于 10,小于或等于 20 | "价格": [ { "数字": [ ">", 10, "<=", 20 ] } ] |
存在 | ProductName 存在 | “产品名称”:[{“存在”:true}] |
不存在 | 产品名称不存在 | “产品名称”:[{“存在”:false}] |
开头为 | 地区位于美国 | "地区": [ {"prefix": "us-" } ] |
概括
我希望这篇短文能说服您重构由 DynamoDB Streams 调用的 Lambda 函数。重构过程非常简单,并且在代码清晰度和成本方面带来显著提升。
鏂囩珷鏉ユ簮锛�https://dev.to/aws-builders/new-dynamodb-streams-filtering-in-serverless-framework-3lc5