AWS 无服务器速成课程 - 使用 SNS 消息触发 Lambda 使用 SNS 触发 AWS Lambda 并进行 DLQ 错误处理

2025-06-08

AWS 无服务器速成课程 - 使用 SNS 消息触发 Lambda

使用带有 DLQ 错误处理的 SNS 触发 AWS Lambda

如果您和我一样,对事件驱动编程情有独钟,那么您一定会想继续阅读。今天,我们将学习如何从AWS SNS消息触发AWS Lambda函数。我已经讨论过一些关于无服务器架构和 AWS 的有趣主题,但还没有像这样的。深入研究,做好准备。我们开始吧。

TL;DR

注意:如果您想立即查看最终结果,本教程的所有代码都已在 GitHub 上。

我们正在建造什么?

我们将专注于创建应用所需的基础组件的步骤。代码本身将模拟随机复杂计算的行为。我选择了一个递归函数,用于计算传递给它的数字的阶乘。这里有一个很棒的图表,因为图表当然很棒!

init函数是唯一公开的函数,它连接到API 网关。它接受一个number参数并进行验证,如果验证成功,它将发布一个 SNS 主题并发送该number值。

SNS 主题将触发第二个名为 的函数calculate。该函数将执行计算并将结果输出到控制台。这模拟了繁重的计算后台任务,例如数据处理、图像处理或机器学习计算。

如果该calculate功能失败,Dead Letter Queue SNS 主题将收到一条消息并触发该error功能。

每个异步调用的函数在失败时都会重试两次。使用死信队列作为错误日志的存储池是一个明智的做法。

现在您想知道,为什么 SNS 如此复杂,而不是仅仅使用Lambda 的调用 API从第一个 lambda 函数调用第二个 lambda 函数?

首先,对于异步工作流(我们的情况就是这样)来说,这是一个严重的反模式。否则,如果您需要立即从第二个 Lambda 函数获取响应,那就没问题了。另一个问题是 Lambda 的并发限制很快就会达到极限。这可能会导致调用丢失和数据丢失。通过 SNS 之类的发布/订阅服务或SQS之类的队列发送数据可以确保数据完整性。

现在明白了吗?太好了,我们再聊聊SNS吧。

什么是 AWS SNS?

在开始编码之前,我们需要了解一些基础知识。我们知道 AWS Lambda 是什么,但 SNS 是什么呢?AWS 文档非常直观。

Amazon Simple Notification Service (SNS) 是一种灵活、完全托管的 发布/订阅消息 和移动通知服务,用于协调向订阅终端和客户端传递消息。

-AWS 文档

简而言之,这意味着它是一种基于发布者/订阅者模式在服务之间发送通知的方式。一个服务发布一些关于某个主题的数据,并将其沿途发送。然后,SNS 会将这些数据分发给该特定主题的所有订阅者。这里的重点是主题,稍后您将了解原因。

使用无服务器框架构建 API

与往常一样,首先要做的是设置项目并安装依赖项。

1.安装无服务器框架

我常用的无服务器应用开发和部署工具是Serverless Framework。我们先安装一下吧。

$ npm i -g serverless
Enter fullscreen mode Exit fullscreen mode

注意: 如果您使用的是 Linux,则可能需要以 sudo 身份运行该命令。

一旦在您的计算机上全局安装,您就可以从终端的任何位置使用这些命令。但是,为了与您的 AWS 账户通信,您需要配置一个 IAM 用户。点击此处查看说明,然后返回并使用提供的密钥运行以下命令。

$ serverless config credentials \ 
    --provider aws \ 
    --key xxxxxxxxxxxxxx \ 
    --secret xxxxxxxxxxxxxx
Enter fullscreen mode Exit fullscreen mode

现在,你的 Serverless 安装已经知道在运行任何终端命令时要连接到哪个账户了。让我们开始实际操作吧。

2.创建服务

创建一个新目录来存放您的无服务器应用服务。在那里启动一个终端。现在,您可以创建新服务了。

什么是服务?它就像一个项目。您可以在其中定义 AWS Lambda 函数、触发这些函数的事件以及它们所需的任何 AWS 基础设施资源(包括我们今天要添加的 SNS),所有这些都包含在一个名为serverless.yml的文件中。

返回您的终端类型:

$ serverless create --template aws-nodejs --path lambda-sns-dlq-error-handling
Enter fullscreen mode Exit fullscreen mode

create 命令会创建一个新的service。惊喜!我们还为该函数指定了一个运行时,也就是template 。传入templateaws-nodejs会将运行时设置为 Node.js。这正是我们想要的。path会为该服务创建一个文件夹

3. 使用代码编辑器探索服务目录

使用您常用的代码编辑器打开lambda-sns-dlq-error-handling文件夹。里面应该有三个文件,但目前我们只关注serverless.yml文件。它包含此服务的所有配置设置。在这里,您可以指定通用配置设置和每个函数的配置设置。您的serverless.yml 文件将包含大量样板代码和注释。您可以随意删除它们,然后粘贴此文件。

service: lambda-sns-dlq-error-handling

plugins:
  - serverless-pseudo-parameters

provider:
  name: aws
  runtime: nodejs8.10
  stage: dev
  region: eu-central-1
  memorySize: 128
  environment:
    accountId: '#{AWS::AccountId}'
    region: '#{AWS::Region}'
  iamRoleStatements:
    - Effect: "Allow"
      Resource: "*"
      Action:
        - "sns:*"

functions:
  init:
    handler: init.handler
    events:
      - http:
          path: init
          method: post
          cors: true
  calculate:
    handler: calculate.handler
    events:
      - sns: calculate-topic # created immediately
    onError: arn:aws:sns:#{AWS::Region}:#{AWS::AccountId}:dlq-topic
  error:
    handler: error.handler
    events:
      - sns: dlq-topic # created immediately
Enter fullscreen mode Exit fullscreen mode

让我们分解一下这里发生了什么。查看functions相关部分。这里有三个函数。从上到下分别是initcalculateerror。该init函数将由一个简单的 HTTP 请求触发,我们通过 API 网关调用该请求。这对我们来说很熟悉。

然而,calculateerror函数是由 SNS 主题触发的。这意味着我们将在init函数中设置逻辑,将消息发布到名为 的主题,calculate-topic而该calculate函数订阅了相同的主题。

接下来,该error函数订阅了 ,dlq-topic并且如果失败,该calculate函数会向该主题发布消息,正如您在 属性中看到的那样onError。现在一切都说得通了,对吧?

请记住,一旦您将 SNS 主题添加为功能的事件,部署服务后就会自动创建资源。

还有什么?看看iamRoleStatements,它们指定我们的函数有权触发并被 SNS 主题调用。同时,该serverless-pseudo-parameters插件允许我们使用 CloudFormation 语法引用我们的AccountIdRegion,从而更轻松地在所有资源中保持一致的 SNS ARN。

4.安装依赖项

幸运的是,这部分很短。只需安装一个包。首先,初始化 npm,然后就可以安装serverless-pseudo-parameters

$ npm init -y && npm i serverless-pseudo-parameters
Enter fullscreen mode Exit fullscreen mode

那就行了。

5.编写业务逻辑

总而言之,配置过程相当简单。我们现在要写的代码也同样简单。没什么特别之处,很抱歉让你失望了。

为了简单起见,我们将这三个函数放在单独的文件中。首先创建一个init.js文件,并将这段代码粘贴进去。

// init.js
const aws = require('aws-sdk')
const sns = new aws.SNS({ region: 'eu-central-1' })

function generateResponse (code, payload) {
  console.log(payload)
  return {
    statusCode: code,
    body: JSON.stringify(payload)
  }
}
function generateError (code, err) {
  console.error(err)
  return generateResponse(code, {
    message: err.message
  })
}
async function publishSnsTopic (data) {
  const params = {
    Message: JSON.stringify(data),
    TopicArn: `arn:aws:sns:${process.env.region}:${process.env.accountId}:calculate-topic`
  }
  return sns.publish(params).promise()
}

module.exports.handler = async (event) => {
  const data = JSON.parse(event.body)
  if (typeof data.number !== 'number') {
    return generateError(400, new Error('Invalid number.'))
  }

  try {
    const metadata = await publishSnsTopic(data)
    return generateResponse(200, {
      message: 'Successfully added the calculation.',
      data: metadata
    })
  } catch (err) {
    return generateError(500, new Error('Couldn\'t add the calculation due to an internal error.'))
  }
}
Enter fullscreen mode Exit fullscreen mode

我们在底部有一些辅助函数和导出的 lambda 函数。这里发生了什么?lambda 函数验证输入并将一些数据发布到calculate-topicSNS 主题。这就是这个函数所做的一切。SNScalculate-topic主题将触发calculatelambda 函数。我们现在来添加它。

创建一个文件并将其命名为calculate.js。粘贴此代码片段。

// calculate.js
module.exports.handler = async (event) => {
  const { number } = JSON.parse(event.Records[0].Sns.Message)
  const factorial = (x) => x === 0 ? 1 : x * factorial(x - 1)
  const result = factorial(number)

  console.log(`The factorial of ${number} is ${result}.`)
  return result
}
Enter fullscreen mode Exit fullscreen mode

如你所见,这只是一个用递归函数实现的简单阶乘计算。它会计算我们通过函数发布到 SNS 主题的数字的阶乘init

这里需要注意的是,如果该calculate函数总共失败三次,它会将消息发布到我们onErrorserverless.yml文件中使用属性指定的死信队列 SNS 主题。死信队列随后会触发该error函数。现在让我们创建它,以便它可以将错误记录到 CloudWatch。创建一个error.js文件,并将以下代码粘贴到其中。

// error.js
module.exports.handler = async (event) => {
  console.error(event)
}
Enter fullscreen mode Exit fullscreen mode

目前来说,这样就够了。不过,理想情况下,你应该有结构化的日志记录,其中包含所有正在发生的情况的详细信息。那是另一篇文章的主题。

将 API 部署到 AWS Lambda

接下来是简单的部分。部署 API 就像运行一个命令一样简单。

$ serverless deploy
Enter fullscreen mode Exit fullscreen mode

您可以看到端点已记录到控制台。这就是您将发送请求的地方。

使用 Dashbird 测试 API

测试 API 最简单的方法是使用 CURL。让我们创建一个简单的 CURL 命令,并将 JSON 负载发送到我们的端点。

$ curl -H "Content-Type: application/json" \
  -d '{"number":1000}' \
  https://<id>.execute-api.eu-central-1.amazonaws.com/dev/init
Enter fullscreen mode Exit fullscreen mode

如果一切正常,计算结果将记录到 CloudWatch。如果没有,那你就倒霉了。在这种情况下,我默认使用Dashbird来调试正在发生的事情。它是免费的,不需要信用卡即可设置。

用几个不同的值多次触发端点后,结果如下。init函数按预期工作。

但是,我们真正感兴趣的是这个calculate函数。成功执行后,它看起来是这样的。

当它失败时,它会指定崩溃并显示错误日志。

重试两次后,它会向死信队列发送一条消息并触发该error功能。

太棒了!我们已经测试了所有不同的场景。希望这能让你更清楚地了解情况。

总结

就这些了,各位。我们已经介绍了如何为 Lambda 创建 SNS 触发器,以及如何实现死信队列来捕获调用失败的错误。使用无服务器进行各种间歇性计算是一个有效的用例,并且在未来会越来越流行。

您无需担心服务器问题,只需按运行时间付费。只需部署代码,即可安心运行。如果出现问题,Dashbird会通过 Slack 或电子邮件提醒您。您一定会爱上 Slack 集成!

再次强调,如果您想查看代码,这里是GitHub 仓库。它可以作为您自己需要 SNS 消息触发 lambda 函数的用例的入门指南。如果您喜欢它并希望更多人在 GitHub 上看到它,请给它一个 star。

GitHub 徽标 adnanrahic / lambda-sns-dlq-错误处理

示例项目展示了如何发布 SNS 主题并从该主题触发函数。代码结构化地创建了超时/崩溃事件,以便死信队列中的 SNS 主题能够发布,进而触发错误处理函数。

使用带有 DLQ 错误处理的 SNS 触发 AWS Lambda

示例项目展示了如何发布 SNS 主题并从该主题触发函数。代码结构化地创建了超时/崩溃事件,以便死信队列中的 SNS 主题能够发布,进而触发错误处理函数。

解释

  • init函数是唯一公开的函数,它连接到 API 网关。它接受一个number参数并进行验证,验证成功后,它会发布一个 SNS 主题并发送该number值。
  • SNS 主题将触发第二个名为 的函数calculate。该函数将执行计算并将结果输出到控制台。这模拟了繁重的计算后台任务,例如数据处理、图像处理或机器学习计算。
  • 如果这两个功能中的任何一个失败,死信队列 SNS 主题将收到一条消息并触发……

如果您想阅读我之前的一些无服务器思考,请转到我的个人资料加入我的时事通讯!

或者,立即查看我的几篇文章:

希望大家喜欢阅读这篇文章,就像我喜欢写这篇文章一样。如果你们喜欢,请点个小爱心,这样 dev.to 上会有更多人看到这篇教程。下次再见,保持好奇心,享受乐趣吧。


本月的赞助商是 Zeet。

免责声明:Zeet将赞助下个月的这篇博文。前几天我试用了一下。它类似于无服务器架构,但可以运行整个后端。你可以自动托管和扩展应用程序。相当不错。


链接:https://dev.to/adnanrahic/a-crash-course-on-serverless-with-aws---triggering-lambda-with-sns-messaging-30nf
PREV
AWS Lambda 和 Node.js 入门
NEXT
AWS 无服务器速成课程 - 在 Lambda 上运行 Node.js 11