使用 SNS、SQS 和 Lambda 在 AWS 中处理事件简介架构实施结论 AWS GenAI LIVE!

2025-06-05

使用 SNS、SQS 和 Lambda 在 AWS 中处理事件

介绍

建筑学

执行

结论

AWS GenAI 直播!

这篇博文是我的 AWS 系列文章的一部分:

介绍

在响应式消息驱动的应用程序中,解耦消息的生产者和消费者至关重要。结合发布/订阅 (pub/sub) 和队列组件,我们能够构建具有弹性、可扩展和容错能力的应用程序架构。AWS 提供了各种实现发布/订阅或队列的组件。

在本文中,我们想了解 AWS 上用于事件和消息处理的两个简单但功能强大的组件:简单通知服务(SNS) 和简单队列服务(SQS)。

我们的目标是开发一个事件管道,每当有人将图片上传到 S3 存储桶时,都会向 Slack 频道发送消息。为了演示,我们还将事件存储在队列中以便异步处理。该架构包含 S3 事件通知、SNS 主题、SQS 队列以及一个向 Slack 频道发送消息的 Lambda 函数。以下是最终结果的动画。

通知

本文的剩余部分结构如下。首先,我们将概述架构。然后,像往常一样,我们将逐步详细介绍如何使用 Terraform 进行设置。最后,我们将讨论主要发现。

建筑学

让我们看一下高层架构。当客户端将图片上传到已配置的 S3 存储桶时,S3 事件通知将向 SNS 发送,并将该事件发布到相应的主题中。该主题将有两个订阅者:一个 SQS 队列和一个 Lambda 函数。

SQS 队列存储事件以供异步处理,例如生成缩略图或图像分类。Lambda 函数解析事件并向 Slack 频道发送通知消息。在本篇博文中,我们不讨论异步处理部分。由于 SNS 的发布和订阅功能已解耦,我们可以稍后为事件添加更多消费者。

架构概述

让我们详细了解一下各个组件。S3 将对象组织到存储桶中。在存储桶内,您可以按键引用单个对象。您可以通过AWS 控制台AWS CLI或直接通过S3 API向 S3 上传文件。

在本文中,我们将使用 CLI 进行上传。控制台和 CLI 运行起来都非常顺畅,因为它们会帮您处理与 S3 的所有底层通信。如果您使用的是 S3 API,并且您的存储桶不可公开写入,则必须使用AWS 签名版本 4手动验证您的请求。

S3 支持配置事件通知。您可以根据对象的创建或删除创建事件,也可以在对象丢失时为冗余度较低的对象发送通知。您可以选择将事件发送到 SNS 主题、SQS 队列或 Lambda 函数。

在我们的案例中,我们将事件发送到 SNS,然后允许感兴趣的应用程序订阅。这被称为消息扇出模式。我们不是直接将事件发送给所有参与方,而是通过使用 SNS 作为中间代理,将发布和订阅解耦。

SNS 是一种简单的发布/订阅服务,它围绕主题进行组织。主题将同一类型的消息分组,这些消息可能引起一组订阅者的兴趣。当新消息发布到主题时,SNS 会通知所有订阅者。您可以配置传送策略,包括配置最大接收速率和重试延迟。

我们的目标是在 S3 存储桶内创建对象时发送一条 Slack 消息。我们通过将 Lambda 函数订阅到 SNS 主题来实现这一点。调用时,Lambda 函数将解析并检查事件通知,提取相关信息,并将其转发到预先配置的 Slack Webhook。

我们还将为该主题订阅一个 SQS 队列,用于存储事件,以便其他 Lambda 函数或长时间运行的轮询服务进行异步处理。下一节将介绍如何实现该架构。

执行

开发工具堆栈

为了开发解决方案,我们使用以下工具:

  • Terraform v0.11.7
  • SBT 1.0.4
  • Scala 2.12.6
  • IntelliJ + Scala 插件 + Terraform 插件

代码可以在 GitHub 上找到。现在我们来看一下每个组件的实现细节。

S3 存储桶

S3存储桶架构

首先,我们将创建一个用于上传图片的 S3 存储桶。我们需要提供一个存储桶名称和一个 ACL。public-read这次我们将使用 ACL,因为我们希望用户能够公开访问他们的图片,但上传时需要身份验证。该force-destroy选项允许 Terraform 销毁存储桶,即使它不为空。

variable "aws_s3_bucket_upload_name" {
  default = "sns-sqs-upload-bucket"
}

resource "aws_s3_bucket" "upload" {
  bucket = "${var.aws_s3_bucket_upload_name}"
  acl    = "public-read"
  force_destroy = true
}
Enter fullscreen mode Exit fullscreen mode

SNS话题

SNS主题架构

接下来,我们来创建 SNS 主题。创建 SNS 主题时,我们只需提供一个名称。

resource "aws_sns_topic" "upload" {
  name = "sns-sqs-upload-topic"
}
Enter fullscreen mode Exit fullscreen mode

如果我们不允许任何人发布消息,那么主题本身就毫无用处。为了实现这一点,我们为主题附加了一项策略,允许我们的存储桶资源SNS:Publish针对该主题执行操作。

resource "aws_sns_topic_policy" "upload" {
  arn = "${aws_sns_topic.upload.arn}"

  policy = "${data.aws_iam_policy_document.sns_upload.json}"
}

data "aws_iam_policy_document" "sns_upload" {
  policy_id = "snssqssns"
  statement {
    actions = [
      "SNS:Publish",
    ]
    condition {
      test = "ArnLike"
      variable = "aws:SourceArn"

      values = [
        "arn:aws:s3:::${var.aws_s3_bucket_upload_name}",
      ]
    }
    effect = "Allow"
    principals {
      type = "AWS"
      identifiers = [
        "*"]
    }
    resources = [
      "${aws_sns_topic.upload.arn}",
    ]
    sid = "snssqssnss3upload"
  }
}
Enter fullscreen mode Exit fullscreen mode

S3 事件通知

S3 事件通知架构

定义好 SNS 主题和 S3 存储桶资源后,我们可以通过创建 S3 存储桶通知来将它们组合起来,该通知将发布到该主题。我们可以控制想要接收通知的事件*.jpeg。在本例中,我们关注所有对象创建事件。我们还可以指定可选的过滤器,例如,在本例中,仅接收文件通知。

resource "aws_s3_bucket_notification" "upload" {
  bucket = "${aws_s3_bucket.upload.id}"

  topic {
    topic_arn     = "${aws_sns_topic.upload.arn}"
    events        = ["s3:ObjectCreated:*"]
    filter_suffix = ".jpeg"
  }
}
Enter fullscreen mode Exit fullscreen mode

SQS队列

SQS 队列架构

SQS 队列的创建方式类似。我们需要为队列提供一个名称,并设置允许 SNS 向该队列发送消息的策略。

resource "aws_sqs_queue" "upload" {
  name = "sns-sqs-upload"
}
Enter fullscreen mode Exit fullscreen mode
resource "aws_sqs_queue_policy" "test" {
  queue_url = "${aws_sqs_queue.upload.id}"
  policy = "${data.aws_iam_policy_document.sqs_upload.json}"
}

data "aws_iam_policy_document" "sqs_upload" {
  policy_id = "snssqssqs"
  statement {
    actions = [
      "sqs:SendMessage",
    ]
    condition {
      test = "ArnEquals"
      variable = "aws:SourceArn"

      values = [
        "${aws_sns_topic.upload.arn}",
      ]
    }
    effect = "Allow"
    principals {
      type = "AWS"
      identifiers = [
        "*"]
    }
    resources = [
      "${aws_sqs_queue.upload.arn}",
    ]
    sid = "snssqssqssns"
  }
}
Enter fullscreen mode Exit fullscreen mode

SQS 订阅

SQS 订阅架构

接下来,我们需要为队列订阅主题。SNS 主题订阅支持多种协议httphttpsemailemail-jsonsms、 。在本例中sqs我们将使用协议并提供主题和队列端点。applicationlambdasqs

resource "aws_sns_topic_subscription" "sqs" {
  topic_arn = "${aws_sns_topic.upload.arn}"
  protocol  = "sqs"
  endpoint  = "${aws_sqs_queue.upload.arn}"
}
Enter fullscreen mode Exit fullscreen mode

Slack Webhook

Slack Webhook 架构

在编写 Lambda 函数并将其订阅到 SNS 主题之前,我们需要先创建 Slack webhook。在 Slack 中使用传入 webhook 分为四个步骤

  1. 创建一个 Slack 应用。Slack 应用就像您工作区中的技术用户一样。
  2. 在您的应用中启用传入的 webhook。
  3. 创建一个新的传入 webhook。您将收到 webhook URL。
  4. 使用 webhook URL 通过 HTTP POST 发送消息。

完成这些步骤后,您将在Slack 应用程序概览页面中看到您的应用程序和已配置的 webhook 。它可能看起来像这样。

Slack 应用程序

松弛 webhook

Lambda 函数

Lambda 架构

消息格式

我们可以使用 Webhook URL 来创建 Lambda 函数。该函数将接收封装在 SNS 通知中的 S3 通知。两者均以 JSON 格式发送,但 S3 通知.Records.Sns.Message以 JSON 字符串的形式存储在字段中,因此也需要进行解析。这是一个 SNS 通知封装消息的示例。

{
    "Records": [
        {
            "EventSource": "aws:sns",
            "EventVersion": "1.0",
            "EventSubscriptionArn": "arn:aws:sns:eu-central-1:195499643157:sns-sqs-upload-topic:c7173bbb-8dda-47f6-9f54-a6aa81f65aac",
            "Sns": {
                "Type": "Notification",
                "MessageId": "10a7c00e-af4b-5d93-9459-93a0604d93f5",
                "TopicArn": "arn:aws:sns:eu-central-1:195499643157:sns-sqs-upload-topic",
                "Subject": "Amazon S3 Notification",
                "Message": "<inner_message>",
                "Timestamp": "2018-06-28T11:55:50.578Z",
                "SignatureVersion": "1",
                "Signature": "sTuBzzioojbez0zGFzdk1DLiCmeby0VuSdBvg0yS6xU+dKOk3U8iFUzbS1ZaNI6oZp+LHhehDziaMkTHQ7qcLBebu9uTI++mGcEhlgz+Ns0Dx3mKXyMTZwEcNtwfHEblJPjHXRsuCQ36RuZjByfI0pc0rsISxdJDr9WElen4U0ltmbzUJVpB22x3ELqciEDRipcpVjZo+V2J8GjdCvKu4uFV6RW3cKDOb91jcPc1vUnv/L6Q1gARIUFTbeUYvLbbIAmOe5PiAT2ZYaAmzHKvGOep/RT+OZOA4F6Ro7pjY0ysFpvvaAp8QKp4Ikj40N9lVKtk24pW+/7OsQMUBGOGoQ==",
                "SigningCertUrl": "https://sns.eu-central-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem",
                "UnsubscribeUrl": "https://sns.eu-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-central-1:195499643157:sns-sqs-upload-topic:c7173bbb-8dda-47f6-9f54-a6aa81f65aac",
                "MessageAttributes": {}
            }
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

在该部分中,<inner_message>您将找到实际的 S3 通知,它可能看起来像这样:

{
    "Records": [
        {
            "eventVersion": "2.0",
            "eventSource": "aws:s3",
            "awsRegion": "eu-central-1",
            "eventTime": "2018-06-28T11:55:50.528Z",
            "eventName": "ObjectCreated:Put",
            "userIdentity": {
                "principalId": "AWS:AIDAI3EXAMPLEEXAMP"
            },
            "requestParameters": {
                "sourceIPAddress": "xxx.yyy.zzz.qqq"
            },
            "responseElements": {
                "x-amz-request-id": "0A8A0DA78EF73966",
                "x-amz-id-2": "/SD3sDpP1mcDc6pC61573e4DAFSCnYoesZxeETb4MV3PpVgT4ud8sw0dMrnWI9whB3RYhwGo+8A="
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "tf-s3-topic-20180628113348955100000002",
                "bucket": {
                    "name": "sns-sqs-upload-bucket",
                    "ownerIdentity": {
                        "principalId": "A2OMJ1OL5PYOLU"
                    },
                    "arn": "arn:aws:s3:::sns-sqs-upload-bucket"
                },
                "object": {
                    "key": "3427394.jpeg",
                    "size": 25044,
                    "eTag": "a3cf1dabef657a65a63a270e27312ddc",
                    "sequencer": "005B34CCC64D9E046E"
                }
            }
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

最有趣的部分是包含s3有关 S3 存储桶和已上传对象的信息的对象。我确信AWS Java SDK 中有一些类可以代表这些信息,但为了撰写这篇博文,我决定使用circe手动解码我感兴趣的部分

源代码

Lambda 函数将使用RequestStreamHandler我们之前使用的相同类。该类是 的一部分aws-lambda-java-libs,仅提供原始输入和输出流,序列化和反序列化则留给我们。以下是源代码:

class Handler extends RequestStreamHandler {
  override def handleRequest(input: InputStream,
                             output: OutputStream,
                             context: Context): Unit = {
    val hookUrl = System.getenv("hook_url")
    val inputJsonString = Source.fromInputStream(input).mkString
    val processingResult = for {
      notification <- decodeNotification(inputJsonString)
      message <- decodeMessage(notification)
    } yield {
      implicit val backend = HttpURLConnectionBackend()
      sttp
        .post(Uri(java.net.URI.create(hookUrl)))
        .contentType("application/json")
        .body(SlackMessage(messageText(notification, message)).asJson.noSpaces)
        .send()
    }

    val out = new PrintStream(output)
    processingResult match {
      case Right(response) => out.print(s"Response from hook: ${response.code}")
      case Left(error)     => out.print(s"Failed: $error")
    }
    out.close()
  }
}
Enter fullscreen mode Exit fullscreen mode

首先,我们从环境变量中提取钩子 URL $hook_url。为了方便阅读,目前省略了错误处理和日志记录。然后,我们从输入流中读取通知 JSON 字符串,并分两步进行解析,因为我懒得提供自定义的反序列化格式。

如果解析成功,我们将向钩子 URL 发送 HTTP POST 请求。Slack 期望请求体为 JSON 格式,且至少包含一个text字段。SlackMessage是一个捕获此字段的案例类。在本例中,我们将根据 S3 存储桶和对象键构建消息文本。要将消息发送到频道,我们必须使用以下正文:

s"""
{
  "text": "Someone uploaded ${s3.`object`.key} to ${s3.bucket.name}."
}
"""
Enter fullscreen mode Exit fullscreen mode

Lambda 函数实际上不必返回任何内容,但我们将返回一个可读的字符串消息,指示钩子是否响应或 SNS 消息解析是否失败。

现在我们只需再次jar使用sbt-assembly插件将 Lambda 处理程序打包成一个 fat 文件即可。运行后,sbt assembly即可使用 Terraform 将工件上传到 AWS Lambda。

地形

在创建 Lambda 函数之前,我们必须创建一个用于执行的 IAM 角色。然后,我们可以创建 Lambda 函数本身,并设置 SNS 通知的权限,以便能够调用我们的 Lambda 函数。首先是 IAM 角色:

resource "aws_iam_role" "lambda_exec" {
  name = "sns-sqs-slack-lambda"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}
Enter fullscreen mode Exit fullscreen mode

为了提高 Terraform 代码的复用性,我们将引入两个变量:工件版本和 Slack webhook URL。我将通过本地terraform.tfvars文件传递 webhook URL。

variable "slack_lambda_version" {
  type = "string"
  default = "0.1-SNAPSHOT"
}

locals {
  slack_lambda_artifact = "../slack/target/scala-2.12/sns-sqs-chat-assembly-${var.slack_lambda_version}.jar"
}

variable "slack_hook_url" {
  type = "string"
}
Enter fullscreen mode Exit fullscreen mode

现在我们可以定义 Lambda 函数资源了。这次我们不会选择 S3 工件,而是直接上传组装好的存档。此外,指定文件名也很有用,source_code_hash这样当文件发生更改但名称保持不变时,系统会触发更新。不过,您也可以考虑在文件名中包含提交 SHA 以及存储库是否干净,以提高透明度。

我们还将分配 1 GB 的 RAM,因为 AWS Lambda 会根据内存分配 CPU,而 JVM 类加载在初始请求中会占用大量 CPU 资源,所以我们需要这种计算能力🔥。此时,我们还将 Slack URL 作为环境变量传递。

resource "aws_lambda_function" "slack" {
  function_name = "sns-sqs-upload-slack"
  filename = "${local.slack_lambda_artifact}"
  source_code_hash = "${base64sha256(file(local.slack_lambda_artifact))}"
  handler = "de.frosner.aws.slack.Handler"
  runtime = "java8"
  role = "${aws_iam_role.lambda_exec.arn}"
  memory_size = 1024
  timeout = 5

  environment {
    variables {
      hook_url = "${var.slack_hook_url}"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

最后,我们必须创建一个允许 SNS 消息触发 Lambda 函数的权限。

resource "aws_lambda_permission" "sns" {
  statement_id  = "AllowExecutionFromSNS"
  action        = "lambda:InvokeFunction"
  function_name = "${aws_lambda_function.slack.function_name}"
  principal     = "sns.amazonaws.com"
  source_arn = "${aws_sns_topic.upload.arn}"
}
Enter fullscreen mode Exit fullscreen mode

Lambda 订阅

Lambda SNS 订阅架构

完成我们的管道唯一缺少的环节是 Lambda 函数对 SNS 主题的订阅。这基本上与 SQS 订阅相同,但lambda这次使用了协议。

resource "aws_sns_topic_subscription" "lambda" {
  topic_arn = "${aws_sns_topic.upload.arn}"
  protocol  = "lambda"
  endpoint  = "${aws_lambda_function.slack.arn}"
}
Enter fullscreen mode Exit fullscreen mode

部署

现在我们可以运行了terraform apply。请确保sbt assembly之前已执行,以便 Terraform 可以上传该工件。

Terraform 部署

结论

终于完成了!这可真是费了一番功夫。我们本来可以通过直接向 Lambda 函数发送 S3 通知来简化架构。但我想演示一下扇出模式,这也是我们引入 SQS 队列的原因,虽然目前还没有使用。

我们已经了解了如何使用 SNS、SQS 和 Lambda 等完全托管的构建块来实现可能包含多个消费者和生产者的事件处理管道。SNS 提供发布/订阅功能来解耦生产者和消费者,而 SQS 则使我们能够异步处理事件。

您使用过 SNS 或 SQS 吗?您使用过 Amazon MQ 或 Kinesis 的经验如何?您认为 SQS 在哪些情况下比较合适?请在评论区留言,分享您的想法。


如果您喜欢这篇文章,您可以在 ko-fi 上支持我

文章来源:https://dev.to/frosnerd/event-handling-in-aws-using-sns-sqs-and-lambda-2ng
PREV
Frank - 你是怎么这么高效的?总结
NEXT
React 面试问题(中高级)