开始使用 AWS 无服务器 - SQS
TL;DR
在本系列中,我将尝试讲解 AWS 上无服务器的基础知识,以便您能够构建自己的无服务器应用程序。在上一篇文章中,我们探索了如何使用 EventBridge 构建事件驱动的应用程序。今天,我们将通过研究 SQS 及其与 Lambda 函数的集成,更深入地探讨事件管理。
⬇️ 我会定期发布无服务器内容,如果你想了解更多 ⬇️
快速公告:我还在开发一个名为🛡 sls-mentor 🛡的库。它汇集了 30 条无服务器最佳实践,这些实践会在您的 AWS 无服务器项目中自动检查(无论使用哪种框架)。它是免费开源的,欢迎随时查看!
介绍
SQS 是亚马逊的简单队列服务 (SQS)。顾名思义,它是一种完全托管的队列服务,允许您在等待消息处理的同时存储它们。它是一种非常有用的服务,可以解耦您的应用程序,并构建事件驱动的应用程序。它也是处理异步任务和管理应用程序负载的绝佳方式。
在本文中,我们将使用 SQS 来寻找一个问题的解决方案:假设您有一个外部 API,它一次只允许一个连接(例如,为了避免垃圾邮件)。如何防止它被用户压垮,同时又能确保每个用户请求最终都能得到处理?这时,SQS 就派上用场了!
SQS 的用例之一是存储消息并限制应用程序的吞吐量。如果某个 Lambda 函数正在处理您的消息,您可以限制该函数的并发执行数量(此处我们将其设置为 1),该 Lambda 函数将逐一处理队列中存储的所有消息。
在一个小的模式中恢复它看起来是这样的:
我们要建造什么?
基于此用例,让我们构建一个模拟的订购应用,其限制是每次只能处理一个订单。我们将使用 SQS 存储订单,并使用 lambda 函数处理订单。使用这种方法,用户可能需要等待几分钟才能处理订单:为了解决这个问题,我们将在订单处理完成后使用 EventBridge 发布一个事件,当订单准备就绪时,用户将通过电子邮件(使用 SES)收到通知。
完成后,应用程序应如下所示:
创建 SQS 队列及其目标 Lambda 函数
与往常一样,您将使用 AWS CDK 结合 TypeScript 来配置此应用程序。如果您需要复习一下,可以查看本系列的第一篇文章,我将在其中更深入地介绍项目的设置。
让我们从应用程序的核心开始:SQS 队列和处理订单的 lambda 函数。
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import path from 'path';
export class LearnServerlessStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// create a FIFO SQS queue
const ordersQueue = new cdk.aws_sqs.Queue(this, 'ordersQueue', {
visibilityTimeout: cdk.Duration.seconds(180),
fifo: true,
});
// defined an event source for the queue, with a batch size of 1
const eventSource = new cdk.aws_lambda_event_sources.SqsEventSource(ordersQueue, {
batchSize: 1,
});
// create a Lambda function that will process the orders, bind it to the event source
const executeOrder = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'executeOrder', {
entry: path.join(__dirname, 'executeOrder', 'handler.ts'),
handler: 'handler',
reservedConcurrentExecutions: 1,
timeout: cdk.Duration.seconds(30),
});
executeOrder.addEventSource(eventSource);
}
}
使用此代码片段,您将配置一个 SQS 队列和一个 Lambda 函数。发送到队列的每条消息都会触发 Lambda 函数,由于并发性(一次执行一条消息)和批处理大小(每条消息由一个订单组成),Lambda 函数将逐条处理这些消息。
我为 lambda 函数设置了 30 秒的超时时间(出于演示目的,我希望模拟处理过程非常长),并将可见性超时设置为 150 秒:AWS 建议将可见性超时设置为 lambda 函数超时时间的 6 倍,这样如果 lambda 函数失败,消息就不会被重复处理。这是一个比较棘手的问题,点击此处了解更多信息。
提供其余基础设施
非 lambda 资源
如介绍架构所示,我们还需要配置事件总线、API 网关和 SES 身份。开始吧!
import { orderExecutedHtmlTemplate } from './orderExecutedHtmlTemplate';
// ...previous code
// Provision a rest API
const restApi = new cdk.aws_apigateway.RestApi(this, 'restApi', {});
// Provision an event bus and a rule to trigger the notification Lambda function
const ordersEventBus = new cdk.aws_events.EventBus(this, 'ordersEventBus');
const notifyOrderExecutedRule = new cdk.aws_events.Rule(this, 'notifyOrderExecutedRule', {
eventBus: ordersEventBus,
eventPattern: {
source: ['notifyOrderExecuted'],
detailType: ['orderExecuted'],
},
});
// Provision a SES template to send beautiful emails
const orderExecutedTemplate = new cdk.aws_ses.CfnTemplate(this, 'orderExecutedTemplate', {
template: {
htmlPart: orderExecutedHtmlTemplate,
subjectPart: 'Your order was passed to our provider!',
templateName: 'orderExecutedTemplate',
},
});
// This part is common to my SES article. No need to follow it if you already have a SES Identity
const DOMAIN_NAME = 'pchol.fr';
const hostedZone = new cdk.aws_route53.HostedZone(this, 'hostedZone', {
zoneName: DOMAIN_NAME,
});
const identity = new cdk.aws_ses.EmailIdentity(this, 'sesIdentity', {
identity: cdk.aws_ses.Identity.publicHostedZone(hostedZone),
});
在此代码片段中,我创建了所有必要的资源,这是基于以前的文章,如果您需要复习API Gateway、EventBridge或SES,您可以查看它们!
我使用一个简单的 HTML 模板来发送电子邮件,该模板从 .ts 文件导出,它包含变量{{itemName}}
、{{quantity}}
和{{username}}
,这些变量将被订单的值替换。
export const orderExecutedHtmlTemplate = `<html>
<head>
<style>
* {
font-family: sans-serif;
text-align: center;
padding: 0;
margin: 0;
}
.title {
color: #fff;
background: #17bb90;
padding: 1em;
}
.container {
border: 2px solid #17bb90;
border-radius: 1em;
margin: 1em auto;
max-width: 500px;
overflow: hidden;
}
.message {
padding: 1em;
line-height: 1.5em;
color: #033c49;
}
.footer {
font-size: .8em;
color: #888;
}
</style>
</head>
<body>
<div class="container">
<div class="title">
<h1>Hello {{username}}!</h1>
</div>
<div class="message">
<p>Your order of {{quantity}} {{itemName}} was passed to our provider!</p>
</div>
</div>
<p class="footer">This is an automated message, please do not try to answer</p>
</body>
</html>`;
Lambda 函数和交互
为了结束本文的配置部分,让我们创建两个缺失的 lambda 函数,以及主题和其他资源之间的接口。
// ... previous code
// Create the request order lambda function
const requestOrder = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'requestOrder', {
entry: path.join(__dirname, 'requestOrder', 'handler.ts'),
handler: 'handler',
environment: {
QUEUE_URL: ordersQueue.queueUrl,
},
});
// Grant the lambda function the right to send messages to the SQS queue, add API Gateway as a trigger
ordersQueue.grantSendMessages(requestOrder);
restApi.root.addResource('request-order').addMethod('POST', new cdk.aws_apigateway.LambdaIntegration(requestOrder));
const executeOrder = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'executeOrder', {
entry: path.join(__dirname, 'executeOrder', 'handler.ts'),
handler: 'handler',
environment: {
EVENT_BUS_NAME: ordersEventBus.eventBusName, // NEW: Add EVENT_BUS_NAME to the environment variables of the executeOrder lambda function
},
reservedConcurrentExecutions: 1,
timeout: cdk.Duration.seconds(30),
});
executeOrder.addEventSource(eventSource);
// NEW: grant the lambda function the right to put events to the event bus
executeOrder.addToRolePolicy(
new cdk.aws_iam.PolicyStatement({
actions: ['events:PutEvents'],
resources: [ordersEventBus.eventBusArn],
}),
);
// Create the notifyOrderExecuted lambda function
const notifyOrderExecuted = new cdk.aws_lambda_nodejs.NodejsFunction(this, 'notifyOrderExecuted', {
entry: path.join(__dirname, 'notifyOrderExecuted', 'handler.ts'),
handler: 'handler',
environment: {
SENDER_EMAIL: `contact@${identity.emailIdentityName}`,
TEMPLATE_NAME: orderExecutedTemplate.ref,
},
});
// Grant the lambda function the right to send emails, add the lambda as a target of the event rule
notifyOrderExecuted.addToRolePolicy(
new cdk.aws_iam.PolicyStatement({
actions: ['ses:SendTemplatedEmail'],
resources: ['*'],
}),
);
notifyOrderExecutedRule.addTarget(new cdk.aws_events_targets.LambdaFunction(notifyOrderExecuted));
我们已经完成了配置部分!让我们继续讨论最有趣的部分:在 lambda 函数内部部署的代码。
Lambda 函数部署代码
让我们从 requestOrder lambda 函数开始。该函数由 POST HTTP 请求触发,并将消息发送到 SQS 队列。如果成功,它还会向客户端返回 200 HTTP 状态码。
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { v4 as uuidv4 } from 'uuid';
const client = new SQSClient({});
export const handler = async ({ body }: { body: string }): Promise<{ statusCode: number; body: string }> => {
const queueUrl = process.env.QUEUE_URL;
if (queueUrl === undefined) {
throw new Error('Missing environment variables');
}
const { itemName, quantity, username, userEmail } = JSON.parse(body) as {
itemName?: string;
quantity?: number;
username?: string;
userEmail?: string;
};
if (itemName === undefined || quantity === undefined || username === undefined || userEmail === undefined) {
return Promise.resolve({
statusCode: 400,
body: JSON.stringify({ message: 'Missing required parameters' }),
});
}
await client.send(
new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify({ itemName, quantity, username, userEmail }),
MessageGroupId: 'ORDER_REQUESTED',
MessageDeduplicationId: uuidv4(),
}),
);
return Promise.resolve({
statusCode: 200,
body: JSON.stringify({ message: 'Order requested' }),
});
};
此代码片段执行以下操作:
- 通常:解析 POST 请求的主体以获取我们需要的 4 个值
- 向 SQS 队列发送一条消息,使用唯一的 ID 以避免重复,并使用恒定的组 ID 来确保该组内消息的顺序
- 向客户端返回 200 HTTP 状态代码
下一个 lambda 函数:executeOrder。此函数由 SQS 队列触发,因此它将具有特殊的输入类型。它将与外部 API 建立 20 秒的伪连接,然后在事件总线上发送事件。
import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
const client = new EventBridgeClient({});
export const handler = async (event: {
Records: {
body: string;
}[];
}): Promise<void> => {
const eventBusName = process.env.EVENT_BUS_NAME;
if (eventBusName === undefined) {
throw new Error('Missing environment variables');
}
const { body } = event.Records[0];
console.log('Communication with external API started...');
await new Promise(resolve => setTimeout(resolve, 20000));
console.log('Communication with external API finished!');
await client.send(
new PutEventsCommand({
Entries: [
{
EventBusName: eventBusName,
Source: 'notifyOrderExecuted',
DetailType: 'orderExecuted',
Detail: body,
},
],
}),
);
};
此代码片段执行以下操作:
- 新增:解析 SQS 输入。类型为记录数组。由于我们将批处理大小设置为 1,因此可以假设数组的长度始终为 1
- 等待 20 秒,与外部 API 建立虚假连接
- 在事件总线上发送事件,并以 SQS 消息的正文作为详细信息。请注意,我为此调用设置了与事件规则目标相同的源和详细信息类型,否则该目标将不会被触发。
最终的 lambda 函数:notifyOrderExecuted。此函数由事件总线触发,因此它将接收另一个输入(此处复习)。它将使用存储在 SES 中的模板向用户发送一封电子邮件。
import { SESv2Client, SendEmailCommand } from '@aws-sdk/client-sesv2';
const client = new SESv2Client({});
export const handler = async (event: {
detail: {
itemName: string;
quantity: number;
username: string;
userEmail: string;
};
}): Promise<void> => {
const senderEmail = process.env.SENDER_EMAIL;
const templateName = process.env.TEMPLATE_NAME;
if (senderEmail === undefined || templateName === undefined) {
throw new Error('Missing environment variables');
}
const { itemName, quantity, username, userEmail } = event.detail;
await client.send(
new SendEmailCommand({
FromEmailAddress: senderEmail,
Content: {
Template: {
TemplateName: templateName,
TemplateData: JSON.stringify({ itemName, quantity, username }),
},
},
Destination: {
ToAddresses: [userEmail],
},
}),
);
};
此代码片段执行以下操作:
- 解析 EventBridge 输入。它会自动从字符串解析为对象,我们只需选择所需的属性即可。
- 使用 SES 发送模板邮件。请记住,TemplateData 必须包含与您在 SES 中创建的模板完全相同的键,否则发送将失败。
代码写完了!让我们通过测试我们的应用来结束这篇文章!
测试我们的应用程序
为了进行此测试,我将对 /request-order 端点进行两次连续的 API 调用。如果一切正常,我应该会在大约 20 秒后收到一封电子邮件,并在大约 40 秒后收到第二封电子邮件(因为 executeOrder Lambda 每次只处理一条消息,并且会休眠 20 秒)。
以下是我提出的两个请求:
我点了4根香蕉和43块饼干!(好饿啊……)
现在让我们检查一下我的电子邮件:
我收到了两封邮件,数量都对了!相信我,第一封邮件我大概20秒就收到了,第二封大概40秒就收到了😇。
家庭作业🤓
我们只构建了一个极简的应用程序,还有很多可以改进的地方。如果您关注过本系列,以下是一些您绝对可以尝试的想法:
- 添加数据库来存储订单,并添加 GET 端点来检索订单
- 仅允许经过身份验证的用户请求订单
- 与真实 API 交互以列出商品及其价格
您还可以构建一个与该后端交互的小型前端,但我将在以后的文章中介绍这一点。
结论
本教程只是一个小型的实际示例,展示了如何在 AWS 上使用事件和 SQS。SQS 可以适应更多用例,我建议您查看文档以了解更多信息!
我计划每两个月更新一次这一系列文章。我已经介绍了如何创建简单的 Lambda 函数和 REST API,以及如何与 DynamoDB 数据库和 S3 存储桶进行交互。您可以在我的代码库中关注这些进展!我将涵盖新的主题,例如前端部署、类型安全、更高级的模式等等……如果您有任何建议,请随时联系我!
如果您能回复并分享这篇文章给您的朋友和同事,我将不胜感激。这将极大地帮助我扩大读者群。另外,别忘了订阅,以便及时收到下一篇文章的更新!
如果您想与我保持联系,请访问我的Twitter 账号。我经常发布或转发有关 AWS 和无服务器的有趣内容,欢迎关注我!
鏂囩珷鏉ユ簮锛�https://dev.to/slsbytheodo/learn-serverless-on-aws-step-by-step-sqs-26c8