开始使用 AWS 无服务器 - SQS

2025-06-10

开始使用 AWS 无服务器 - SQS

TL;DR

在本系列中,我将尝试讲解 AWS 上无服务器的基础知识,以便您能够构建自己的无服务器应用程序。在上一篇文章中,我们探索了如何使用 EventBridge 构建事件驱动的应用程序。今天,我们将通过研究 SQS 及其与 Lambda 函数的集成,更深入地探讨事件管理。

⬇️ 我会定期发布无服务器内容,如果你想了解更多 ⬇️

在 Twitter 上关注我🚀

快速公告:我还在开发一个名为🛡 sls-mentor 🛡的库。它汇集了 30 条无服务器最佳实践,这些实践会在您的 AWS 无服务器项目中自动检查(无论使用哪种框架)。它是免费开源的,欢迎随时查看!

在 Github 上查找 sls-mentor⭐️

介绍

SQS 是亚马逊的简单队列服务 (SQS)。顾名思义,它是一种完全托管的队列服务,允许您在等待消息处理的同时存储它们。它是一种非常有用的服务,可以解耦您的应用程序,并构建事件驱动的应用程序。它也是处理异步任务和管理应用程序负载的绝佳方式。

在本文中,我们将使用 SQS 来寻找一个问题的解决方案:假设您有一个外部 API,它一次只允许一个连接(例如,为了避免垃圾邮件)。如何防止它被用户压垮,同时又能确保每个用户请求最终都能得到处理?这时,SQS 就派上用场了!

SQS 的用例之一是存储消息并限制应用程序的吞吐量。如果某个 Lambda 函数正在处理您的消息,您可以限制该函数的并发执行数量(此处我们将其设置为 1),该 Lambda 函数将逐一处理队列中存储的所有消息。

在一个小的模式中恢复它看起来是这样的:

sqs 解释

我们要建造什么?

基于此用例,让我们构建一个模拟的订购应用,其限制是每次只能处理一个订单。我们将使用 SQS 存储订单,并使用 lambda 函数处理订单。使用这种方法,用户可能需要等待几分钟才能处理订单:为了解决这个问题,我们将在订单处理完成后使用 EventBridge 发布一个事件,当订单准备就绪时,用户将通过电子邮件(使用 SES)收到通知。

完成后,应用程序应如下所示:

应用程序架构

创建 SQS 队列及其目标 Lambda 函数

与往常一样,您将使用 AWS CDK 结合 TypeScript 来配置此应用程序。如果您需要复习一下,可以查看本系列的第一篇文章,我将在其中更深入地介绍项目的设置。

让我们从应用程序的核心开始:SQS 队列和处理订单的 lambda 函数。

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import path from 'path';

export class LearnServerlessStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // create a FIFO SQS queue
    const ordersQueue = new cdk.aws_sqs.Queue(this, 'ordersQueue', {
      visibilityTimeout: cdk.Duration.seconds(180),
      fifo: true,
    });

    // defined an event source for the queue, with a batch size of 1
    const eventSource = new cdk.aws_lambda_event_sources.SqsEventSource(ordersQueue, {
      batchSize: 1,
    });

    // create a Lambda function that will process the orders, bind it to the event source
    const executeOrder = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'executeOrder', {
      entry: path.join(__dirname, 'executeOrder', 'handler.ts'),
      handler: 'handler',
      reservedConcurrentExecutions: 1,
      timeout: cdk.Duration.seconds(30),
    });
    executeOrder.addEventSource(eventSource);
  }
}
Enter fullscreen mode Exit fullscreen mode

使用此代码片段,您将配置一个 SQS 队列和一个 Lambda 函数。发送到队列的每条消息都会触发 Lambda 函数,由于并发性(一次执行一条消息)和批处理大小(每条消息由一个订单组成),Lambda 函数将逐条处理这些消息。

我为 lambda 函数设置了 30 秒的超时时间(出于演示目的,我希望模拟处理过程非常长),并将可见性超时设置为 150 秒:AWS 建议将可见性超时设置为 lambda 函数超时时间的 6 倍,这样如果 lambda 函数失败,消息就不会被重复处理。这是一个比较棘手的问题,点击此处了解更多信息。

提供其余基础设施

非 lambda 资源

如介绍架构所示,我们还需要配置事件总线、API 网关和 SES 身份。开始吧!

import { orderExecutedHtmlTemplate } from './orderExecutedHtmlTemplate';
// ...previous code

// Provision a rest API
const restApi = new cdk.aws_apigateway.RestApi(this, 'restApi', {});

// Provision an event bus and a rule to trigger the notification Lambda function
const ordersEventBus = new cdk.aws_events.EventBus(this, 'ordersEventBus');
const notifyOrderExecutedRule = new cdk.aws_events.Rule(this, 'notifyOrderExecutedRule', {
  eventBus: ordersEventBus,
  eventPattern: {
    source: ['notifyOrderExecuted'],
    detailType: ['orderExecuted'],
  },
});

// Provision a SES template to send beautiful emails
const orderExecutedTemplate = new cdk.aws_ses.CfnTemplate(this, 'orderExecutedTemplate', {
  template: {
    htmlPart: orderExecutedHtmlTemplate,
    subjectPart: 'Your order was passed to our provider!',
    templateName: 'orderExecutedTemplate',
  },
});

// This part is common to my SES article. No need to follow it if you already have a SES Identity
const DOMAIN_NAME = 'pchol.fr';

const hostedZone = new cdk.aws_route53.HostedZone(this, 'hostedZone', {
  zoneName: DOMAIN_NAME,
});

const identity = new cdk.aws_ses.EmailIdentity(this, 'sesIdentity', {
  identity: cdk.aws_ses.Identity.publicHostedZone(hostedZone),
});
Enter fullscreen mode Exit fullscreen mode

在此代码片段中,我创建了所有必要的资源,这是基于以前的文章,如果您需要复习API GatewayEventBridgeSES,您可以查看它们!

我使用一个简单的 HTML 模板来发送电子邮件,该模板从 .ts 文件导出,它包含变量{{itemName}}{{quantity}}{{username}},这些变量将被订单的值替换。

export const orderExecutedHtmlTemplate = `<html>
  <head>
    <style>
      * {
        font-family: sans-serif;
        text-align: center;
        padding: 0;
        margin: 0;
      }
      .title {
        color: #fff;
        background: #17bb90;
        padding: 1em;
      }
      .container {
        border: 2px solid #17bb90;
        border-radius: 1em;
        margin: 1em auto;
        max-width: 500px;
        overflow: hidden;
      }
      .message {
        padding: 1em;
        line-height: 1.5em;
        color: #033c49;
      }
      .footer {
        font-size: .8em;
        color: #888;
      }
    </style>
  </head>
  <body>
    <div class="container">
      <div class="title">
        <h1>Hello {{username}}!</h1>
      </div>
      <div class="message">
        <p>Your order of {{quantity}} {{itemName}} was passed to our provider!</p>
      </div>
    </div>
    <p class="footer">This is an automated message, please do not try to answer</p>
  </body>
</html>`;
Enter fullscreen mode Exit fullscreen mode

Lambda 函数和交互

为了结束本文的配置部分,让我们创建两个缺失的 lambda 函数,以及主题和其他资源之间的接口。

// ... previous code

// Create the request order lambda function
const requestOrder = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'requestOrder', {
  entry: path.join(__dirname, 'requestOrder', 'handler.ts'),
  handler: 'handler',
  environment: {
    QUEUE_URL: ordersQueue.queueUrl,
  },
});

// Grant the lambda function the right to send messages to the SQS queue, add API Gateway as a trigger
ordersQueue.grantSendMessages(requestOrder);
restApi.root.addResource('request-order').addMethod('POST', new cdk.aws_apigateway.LambdaIntegration(requestOrder));

const executeOrder = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'executeOrder', {
  entry: path.join(__dirname, 'executeOrder', 'handler.ts'),
  handler: 'handler',
  environment: {
    EVENT_BUS_NAME: ordersEventBus.eventBusName, // NEW: Add EVENT_BUS_NAME to the environment variables of the executeOrder lambda function
  },
  reservedConcurrentExecutions: 1,
  timeout: cdk.Duration.seconds(30),
});

executeOrder.addEventSource(eventSource);
// NEW: grant the lambda function the right to put events to the event bus
executeOrder.addToRolePolicy(
  new cdk.aws_iam.PolicyStatement({
    actions: ['events:PutEvents'],
    resources: [ordersEventBus.eventBusArn],
  }),
);

// Create the notifyOrderExecuted lambda function
const notifyOrderExecuted = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'notifyOrderExecuted', {
  entry: path.join(__dirname, 'notifyOrderExecuted', 'handler.ts'),
  handler: 'handler',
  environment: {
    SENDER_EMAIL: `contact@${identity.emailIdentityName}`,
    TEMPLATE_NAME: orderExecutedTemplate.ref,
  },
});

// Grant the lambda function the right to send emails, add the lambda as a target of the event rule
notifyOrderExecuted.addToRolePolicy(
  new cdk.aws_iam.PolicyStatement({
    actions: ['ses:SendTemplatedEmail'],
    resources: ['*'],
  }),
);
notifyOrderExecutedRule.addTarget(new cdk.aws_events_targets.LambdaFunction(notifyOrderExecuted));
Enter fullscreen mode Exit fullscreen mode

我们已经完成了配置部分!让我们继续讨论最有趣的部分:在 lambda 函数内部部署的代码。

Lambda 函数部署代码

让我们从 requestOrder lambda 函数开始。该函数由 POST HTTP 请求触发,并将消息发送到 SQS 队列。如果成功,它还会向客户端返回 200 HTTP 状态码。

import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { v4 as uuidv4 } from 'uuid';

const client = new SQSClient({});

export const handler = async ({ body }: { body: string }): Promise<{ statusCode: number; body: string }> => {
  const queueUrl = process.env.QUEUE_URL;

  if (queueUrl === undefined) {
    throw new Error('Missing environment variables');
  }

  const { itemName, quantity, username, userEmail } = JSON.parse(body) as {
    itemName?: string;
    quantity?: number;
    username?: string;
    userEmail?: string;
  };

  if (itemName === undefined || quantity === undefined || username === undefined || userEmail === undefined) {
    return Promise.resolve({
      statusCode: 400,
      body: JSON.stringify({ message: 'Missing required parameters' }),
    });
  }

  await client.send(
    new SendMessageCommand({
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify({ itemName, quantity, username, userEmail }),
      MessageGroupId: 'ORDER_REQUESTED',
      MessageDeduplicationId: uuidv4(),
    }),
  );

  return Promise.resolve({
    statusCode: 200,
    body: JSON.stringify({ message: 'Order requested' }),
  });
};
Enter fullscreen mode Exit fullscreen mode

此代码片段执行以下操作:

  • 通常:解析 POST 请求的主体以获取我们需要的 4 个值
  • 向 SQS 队列发送一条消息,使用唯一的 ID 以避免重复,并使用恒定的组 ID 来确保该组内消息的顺序
  • 向客户端返回 200 HTTP 状态代码

下一个 lambda 函数:executeOrder。此函数由 SQS 队列触发,因此它将具有特殊的输入类型。它将与外部 API 建立 20 秒的伪连接,然后在事件总线上发送事件。

import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';

const client = new EventBridgeClient({});

export const handler = async (event: {
  Records: {
    body: string;
  }[];
}): Promise<void> => {
  const eventBusName = process.env.EVENT_BUS_NAME;

  if (eventBusName === undefined) {
    throw new Error('Missing environment variables');
  }

  const { body } = event.Records[0];

  console.log('Communication with external API started...');
  await new Promise(resolve => setTimeout(resolve, 20000));
  console.log('Communication with external API finished!');

  await client.send(
    new PutEventsCommand({
      Entries: [
        {
          EventBusName: eventBusName,
          Source: 'notifyOrderExecuted',
          DetailType: 'orderExecuted',
          Detail: body,
        },
      ],
    }),
  );
};
Enter fullscreen mode Exit fullscreen mode

此代码片段执行以下操作:

  • 新增:解析 SQS 输入。类型为记录数组。由于我们将批处理大小设置为 1,因此可以假设数组的长度始终为 1
  • 等待 20 秒,与外部 API 建立虚假连接
  • 在事件总线上发送事件,并以 SQS 消息的正文作为详细信息。请注意,我为此调用设置了与事件规则目标相同的源和详细信息类型,否则该目标将不会被触发。

最终的 lambda 函数:notifyOrderExecuted。此函数由事件总线触发,因此它将接收另一个输入(此处复习)。它将使用存储在 SES 中的模板向用户发送一封电子邮件。

import { SESv2Client, SendEmailCommand } from '@aws-sdk/client-sesv2';

const client = new SESv2Client({});

export const handler = async (event: {
  detail: {
    itemName: string;
    quantity: number;
    username: string;
    userEmail: string;
  };
}): Promise<void> => {
  const senderEmail = process.env.SENDER_EMAIL;
  const templateName = process.env.TEMPLATE_NAME;

  if (senderEmail === undefined || templateName === undefined) {
    throw new Error('Missing environment variables');
  }

  const { itemName, quantity, username, userEmail } = event.detail;

  await client.send(
    new SendEmailCommand({
      FromEmailAddress: senderEmail,
      Content: {
        Template: {
          TemplateName: templateName,
          TemplateData: JSON.stringify({ itemName, quantity, username }),
        },
      },
      Destination: {
        ToAddresses: [userEmail],
      },
    }),
  );
};
Enter fullscreen mode Exit fullscreen mode

此代码片段执行以下操作:

  • 解析 EventBridge 输入。它会自动从字符串解析为对象,我们只需选择所需的属性即可。
  • 使用 SES 发送模板邮件。请记住,TemplateData 必须包含与您在 SES 中创建的模板完全相同的键,否则发送将失败。

代码写完了!让我们通过测试我们的应用来结束这篇文章!

测试我们的应用程序

为了进行此测试,我将对 /request-order 端点进行两次连续的 API 调用。如果一切正常,我应该会在大约 20 秒后收到一封电子邮件,并在大约 40 秒后收到第二封电子邮件(因为 executeOrder Lambda 每次只处理一条消息,并且会休眠 20 秒)。

以下是我提出的两个请求:

请求-1

请求-2

我点了4根香蕉和43块饼干!(好饿啊……)

现在让我们检查一下我的电子邮件:

电子邮件-1

电子邮件-2

我收到了两封邮件,数量都对了!相信我,第一封邮件我大概20秒就收到了,第二封大概40秒就收到了😇。

家庭作业🤓

我们只构建了一个极简的应用程序,还有很多可以改进的地方。如果您关注过本系列,以下是一些您绝对可以尝试的想法:

  • 添加数据库来存储订单,并添加 GET 端点来检索订单
  • 仅允许经过身份验证的用户请求订单
  • 与真实 API 交互以列出商品及其价格

您还可以构建一个与该后端交互的小型前端,但我将在以后的文章中介绍这一点。

结论

本教程只是一个小型的实际示例,展示了如何在 AWS 上使用事件和 SQS。SQS 可以适应更多用例,我建议您查看文档以了解更多信息!

我计划每两个月更新一次这一系列文章。我已经介绍了如何创建简单的 Lambda 函数和 REST API,以及如何与 DynamoDB 数据库和 S3 存储桶进行交互。您可以在我的代码库中关注这些进展!我将涵盖新的主题,例如前端部署、类型安全、更高级的模式等等……如果您有任何建议,请随时联系我!

如果您能回复并分享这篇文章给您的朋友和同事,我将不胜感激。这将极大地帮助我扩大读者群。另外,别忘了订阅,以便及时收到下一篇文章的更新!

如果您想与我保持联系,请访问我的Twitter 账号。我经常发布或转发有关 AWS 和无服务器的有趣内容,欢迎关注我!

在 Twitter 上关注我🚀

鏂囩珷鏉ユ簮锛�https://dev.to/slsbytheodo/learn-serverless-on-aws-step-by-step-sqs-26c8
PREV
🥇 Lambdalith 优势:使用 CDK 在 AWS Lambda 上部署 NestJS 的完整指南
NEXT
AWS 无服务器入门 - 电子邮件