新功能:无服务器框架中的 DynamoDB 流过滤

2025-06-08

新功能:无服务器框架中的 DynamoDB 流过滤

通过本文,您将了解如何利用 DynamoDB 和 Lambda 最新发布的 Streams Filtering 功能。

我们将深入讲解 DynamoDB 事件操作过滤的基本示例。您将学习如何将其与您的业务逻辑相结合。我将使用 DynamoDB单表设计设置来实现这一点。

什么是新的?

如果您还没有听说过,就在#reInvent2021 之前,AWS 发布了这个重大更新

有什么变化?

更新前

在 DynamoDB 表(INSERTMODIFYREMOVE)中执行的每个操作都会触发一个事件,该事件通过 DynamoDB 流发送到 Lambda 函数。无论操作类型如何,始终都会调用 Lambda 函数。这会产生两个影响:

  • if在执行业务逻辑之前,您必须在 Lambda 代码(条件)中实现过滤逻辑(即,每当表中添加INSERT新内容时,过滤操作都会发送欢迎电子邮件)。User
  • 您需要为每次 Lambda 运行付费,尽管在大多数情况下您只对某些事件感兴趣。

这种情况在单表设计中成倍增加,在单表中存储多种类型,因此实际上您有许多INSERT具有子类型的(即新用户、新地址、新订单等)

更新后

现在,您可以过滤掉与业务逻辑无关的事件。通过定义过滤条件,您可以控制哪些事件可以调用 Lambda 函数。过滤会根据消息中的值来评估事件。

这解决了上述问题:

  • if逻辑评估已推送至 AWS( Lambda 代码中不再有s)
  • 不再需要执行不必要的 Lambda。

这一切都归功于定义过滤条件的小 JSON 片段。

重构流过滤

既然您正在阅读这篇文章,那么可以肯定地说,您和我一样,已经在使用 DynamoDB Streams 来调用您的 Lambda 函数。

因此,让我带你了解一下重构过程。这是我在生产环境中使用的代码的简化版本。

在我的 DynamoDB 表中,我存储了两种类型的实体:OrderInvoice。我的业务逻辑要求我仅在被修改时执行某些操作。 如您所见,它只是六种情况中的一种。想象一下,如果您的表中有更多类型,并且您的业务逻辑也要求您执行其他操作,会发生什么情况。Invoice
业务逻辑条件

旧事件过滤

让我们从更新之前那些丑陋的语句开始,if因为我必须手动过滤事件。

我的 Lambda 处理程序从执行parseEvent方法开始:

const parseEvent = (event) => {
  const e = event.Records[0] // batch size = 1
  const isInsert = e.eventName === 'INSERT'
  const isModify = e.eventName === 'MODIFY'

  const isOrder = e.dynamodb.NewImage?.Type?.S === 'Order'
  const isInvoice = e.dynamodb.NewImage?.Type?.S === 'Invoice'

  const newItemData = e.dynamodb.NewImage
  const oldItemData = e.dynamodb.OldImage

  return {
    isInsert, isModify, isOrder, isInvoice, newItemData, oldItemData
  }
}
Enter fullscreen mode Exit fullscreen mode

下一步,我必须评估处理程序中的条件:

const {
    isInsert, isModify, isOrder, isInvoice, newItemData, oldItemData
  } = parseEvent(event)


if (isModify && isInvoice) {
  // perform business logic
  // uses newItemData & oldItemData values
}
Enter fullscreen mode Exit fullscreen mode

新事件过滤

通过在 AWS 上推动条件评估,新功能使我们能够显著简化该代码。

简单回顾一下,我的业务逻辑要求我仅允许在实体MODIFY上执行的事件Invoice。幸运的是,我Type在 DynamoDB 表中保存了实体的值(感谢 Alex 🤝)。

DynamoDB 事件结构定义明确,因此基本上我需要做的是确保:

  • eventName等于MODIFY,并且
  • dynamodb.NewImage.Type.S等于Invoice

所有这些都在 Lambda 配置部分中定义filterPatterns。以下是 Serverless Framework 配置文件中的一段代码serverless.yml。对 的支持filterPatterns是在 2.68.0 版本中引入的——请确保您使用的是该版本或更高版本。

    functionName:
      handler: src/functionName/function.handler
      # other properties
      events:
      - stream:
          type: dynamodb
          arn: !GetAtt DynamoDbTable.StreamArn
          maximumRetryAttempts: 1
          batchSize: 1
          filterPatterns:
            - eventName: [MODIFY]
              dynamodb:
                 NewImage:
                   Type:
                     S: [Invoice]
Enter fullscreen mode Exit fullscreen mode

这就是过滤 DynamoDB 流所需要做的全部工作。

很神奇,不是吗?

陷阱

请记住,单个源上可以有多个过滤器。在这种情况下,每个过滤器都独立工作。简而言之,它们之间OR没有AND逻辑。

我错误地创建了两个过滤器,从而学到了这一点:

          filterPatterns:
            - eventName: [MODIFY]
            - dynamodb:
                 NewImage:
                   Type:
                     S: [Invoice]
Enter fullscreen mode Exit fullscreen mode

-在 前面添加dynamodb:。这导致了错误的过滤器:

{
  "filters": [
    {
      "pattern": "{\"eventName\":[\"MODIFY\"]}"
    },
    {
      "pattern": "{\"dynamodb\":{\"NewImage\":{\"Type\":{\"S\":[\"Invoice\"]}}}}"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

那个可以捕获所有动作或对象包含MODIFY的任何内容,因此 DynamoDB动作也是如此!InvoiceTypeNewImageINSERT

正确的过滤器:

{
  "filters": [
    {
      "pattern": "{\"eventName\":[\"MODIFY\"],\"dynamodb\":{\"NewImage\":{\"Type\":{\"S\":[\"Invoice\"]}}}}"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

您可以在 Lambda 控制台的配置->触发器部分下查看过滤器

全局表

正如kolektiv在下面的评论中提到的,此功能不适用于全局表。

还有一个问题,您无法对全局表使用过滤功能,否则您的过滤器将不会被执行,您的函数也不会被调用。已与 AWS 支持确认。

感谢您指出这一点。

它要多少钱?

没有什么。

目前尚无任何关于额外定价的信息。此外,Jeremy Daly 在 2021 年 re:Invent 大会上也证实了这一点。

实际上,此功能可以节省您的维护费用,因为编写和调试 Lambda 代码更加容易,并且操作也更加简单,因为函数仅在响应业务相关事件时执行

低耦合

在更新之前,人们在单个 Lambda 函数中实现了事件过滤逻辑。因此,他们面临着高耦合的问题(除非他们使用了某种调度程序模式)。

现在,我们可以将多个独立的 Lambda 函数(每个函数都有各自的筛选条件)附加到同一个 DynamoDB 流。这样可以降低处理不同事件类型的代码之间的耦合度。所有单表设计实践者都会非常感激这一点

更新

我忘了说了,除了在过滤器中评估字符串相等条件之外,你还能做更多的事情。通过几个比较运算符,你还可以实现更多的可能性。

这是一张表格 被盗摘自AWS Docs(如果不能包含在这里,请告诉我。):

比较运算符 例子 规则语法
无效的 UserID 为空 "用户 ID": [ 空 ]
空的 姓氏为空 “姓”: [””]
等于 名字是“爱丽丝” "姓名": [ "爱丽丝" ]
地点为“纽约”,日期为“星期一” "地点": [ "纽约" ], "日期": ["星期一"]
或者 PaymentType 为“信用卡”或“借记卡” "PaymentType": [ "信用卡", "借记卡"]
不是 天气绝不是“下雨” "天气": [ { "anything-but": [ "下雨" ] } ]
数字(等于) 价格是100 "价格": [ { "数字": [ "=", 100 ] } ]
数字(范围) Price 大于 10,小于或等于 20 "价格": [ { "数字": [ ">", 10, "<=", 20 ] } ]
存在 ProductName 存在 “产品名称”:[{“存在”:true}]
不存在 产品名称不存在 “产品名称”:[{“存在”:false}]
开头为 地区位于美国 "地区": [ {"prefix": "us-" } ]

概括

我希望这篇短文能说服您重构由 DynamoDB Streams 调用的 Lambda 函数。重构过程非常简单,并且在代码清晰度和成本方面带来显著提升。

鏂囩珷鏉ユ簮锛�https://dev.to/aws-builders/new-dynamodb-streams-filtering-in-serverless-framework-3lc5
PREV
Redis:探索将 Redis 作为无服务器数据库来解决 API 中的幂等性问题 🛵 📬 Middy 的幂等中间件
NEXT
在 Amazon Linux 2023 上安装 MySQL