为异步作业构建可扩展、可靠且经济高效的事件调度程序
介绍
欢迎回到我的博客!😁 在这里,我会和自己——也希望和你——聊聊我在工作中解决的工程问题。我这样做主要是因为寻找解决方案让我感到兴奋。我不断探索效率低下、瓶颈和挑战,这段经历让我最终解决了软件工程中一个常见却又至关重要的问题。
这个问题在于需要异步执行操作——通常需要精确的时间,有时甚至需要重复执行。遵循我解决问题的核心方法(跨越空间和时间),我决定构建一个解决方案,它不仅针对单个操作,而且可扩展到各种用例。无论是发送通知、处理交易还是触发系统工作流,许多任务都需要按计划执行。如果没有强大的调度机制,高效地处理这些任务很快就会变得复杂、不可靠且成本高昂。
为了解决这个问题,我着手构建一个可扩展、可靠且经济高效的事件调度程序——可以无缝管理延迟、即时和重复的操作。
在本文中,我将引导您了解:
- 导致需要事件调度程序的问题
- 理想解决方案的功能性和非功能性需求
- 实施背后的系统设计和架构决策
最后,您将清晰地了解如何构建一个无服务器调度操作系统,以确保准确性、持久性和可扩展性,同时控制成本。让我们开始吧!
问题:管理日历周期内的订阅变更
订阅管理面临着独特的挑战,尤其是在处理取消或降级时😭。用户可以在其计费周期内的任何时间请求这些更改,但由于订阅的预付费性质,此类修改只能在周期结束时生效。这种延迟带来了对异步执行的需求——一个可以立即记录这些请求,但将其执行推迟到适当时间的系统。
解决方案:适当的调度机制
如果没有合适的调度机制,高效管理这些延迟操作将变得非常复杂。系统必须确保每个请求都在正确的时间执行,同时避免操作遗漏或重复。此外,频繁执行的操作(例如批量处理多个计划变更)必须能够处理,而不会造成系统过载。为了解决这个问题,我们需要一个可靠、可扩展且经济高效的调度程序,能够无缝处理延迟和重复执行。
功能需求:定义核心能力
强大且可扩展的计划操作系统必须能够有效地安排、执行、更新、监控和重试操作,同时确保可靠性和灵活性。
1. 安排和创建动作
系统必须允许用户通过以下方式安排操作:
- 操作类型、执行时间、执行数据和元数据是必填字段。
- 可选字段,如重复、频率和执行剩余。
- 早期验证以确保行动符合其履行要求。
2. 更新或删除操作
用户可以在操作锁定前(执行前 2 分钟)更新或删除操作。锁定后,不允许任何外部更改。
3. 动作状态管理
每个操作都必须具有内部管理的状态,以反映其执行进度。状态转换和结果必须记录在元数据中以便跟踪。
4. 行动实现图
每个操作都必须映射到负责其执行的特定履行服务。没有匹配履行服务的操作必须进行标记,以防止出现执行错误。
5. 重试失败的操作
失败的操作必须使用指数退避算法重试,以处理临时故障。超过最大重试限制的操作必须进行标记,以便进行手动干预。
6. 处理立即行动、延迟行动和重复行动
系统必须区分立即操作和延迟操作,以确保及时执行:
- 即时操作(2分钟内执行)必须实时处理,且无调度延迟。
- 延迟操作(2 分钟后执行)必须在正确的时间安排和处理。
- 重复动作必须按照规定的次数和频率进行
非功能性需求(NFR):确保系统可靠且可扩展
计划行动系统必须满足关键的NFR,以保证可靠性、可扩展性、安全性和可维护性。
1.可靠性和耐用性
- 操作必须正确且按时执行(±2 分钟)。
- 重复操作必须按照正确的频率严格按照计划执行。
- 失败的操作必须以指数退避算法重试,非重复操作只能执行一次。
2.可扩展性
- 系统必须动态扩展以处理高请求负载。
- 无服务器架构确保成本效益和灵活性。
- 基于队列的方法(例如 AWS SQS)必须调节执行频率以防止下游服务过载。
3. 可用性
- 系统必须始终可用,不能冷启动,确保在需要时立即执行。无服务器架构以合理的成本支持这一点。
4. 安全
- 基于签名的验证必须确保请求的安全并防止未经授权的执行。
5.可维护性
- 该系统必须是模块化的、封装的、并在单个存储库内组织。
- 必须编纂基础设施和数据库索引规则。
- 必须使用类型化语言来获得更好的可靠性。
- 必须使用加密的环境变量来启用本地测试。
- 启动脚本可以自动化包安装和环境设置。
- 全面的测试必须确保安全的变更和集成。
6.可观察性
- API 端点必须公开:
- 所有预定的行动。
- 按状态过滤的操作。
- 失败操作的重试功能。
- 集中式日志系统必须持续跟踪执行问题。
工具:为计划操作系统提供支持
AWS Lambda:用于执行的无服务器计算
- 无需管理服务器即可实现事件驱动的执行。
- 处理动作调度和验证。
- 使用实时事件流处理即时操作。
- 在预定的时间执行延迟操作。
- 根据操作类型管理履行任务。
Amazon EventBridge:管理计划执行
- 充当延迟操作的调度程序。
- 每 5 分钟轮询一次到期待处理操作并将其排入队列进行处理。
- 确保执行在预定时间的±2分钟内进行。
Amazon SQS:排队操作以实现可扩展性
- 通过异步处理计划操作来解耦执行工作负载。
- 控制履行请求频率以防止系统过载。
- 使用 FIFO(先进先出)处理来维护执行顺序并防止重复执行。
Amazon DynamoDB:存储计划操作
- 用作存储计划操作的主要数据库。
- 提供快速读/写操作以处理高工作负载。
- 存储用于跟踪执行状态、重试和结果的元数据。
- 使用 DynamoDB Streams 触发立即执行。
Amazon API Gateway:公开管理端点
- 提供用于创建、更新和删除计划操作的 HTTP 端点。
- 公开监控端点以按状态检索操作并重试失败的操作。
- 通过身份验证和授权机制确保安全访问。
系统设计:计划操作的数据库模式
场地 | 描述 |
---|---|
id |
每个计划操作的唯一标识符。 |
data |
存储执行特定的详细信息。 |
action |
定义要执行的操作类型。 |
executionTime |
指定操作应运行的时间。 |
repeat |
指示是否应重复该操作。 |
frequency |
定义重复操作的间隔。 |
executionRemainder |
跟踪剩余的执行次数。 |
status |
执行状态(“PENDING”、“IN_PROGRESS”、“COMPLETED”、“FAILED”)。 |
createdAt |
创建操作的时间戳。 |
updatedAt |
最后修改时间戳。 |
retryCount |
计算执行重试失败的次数。 |
metadata |
存储日志和其他执行细节。 |
示例:计划通知操作
{
"data": {
"mobile": "60123456789",
"subject": "Test",
"name": "Joojo",
"templateType": "USER_LATE_PAYMENT_NOTIFICATION",
"notificationType": "SMS"
},
"repeat": true,
"frequency": "DAILY",
"executionRemainder": 5,
"action": "SEND_NOTIFICATION",
"executionTime": 1736930117120
}
项目结构
我采用了分层模块化方法,以实现可维护性、可扩展性和易于更改。很多时候,不同的团队可能希望扩展服务中的变更,而不会引入意外的副作用。我尝试通过将组件组织到不同的模块中来实现这一点。让我们深入探讨一下
1. 模块化设计的单一应用程序
整个系统构建为一个单一的应用程序,但采用模块化结构,将关注点分离。每个模块负责系统的特定方面,使代码库更易于导航和修改。
./src
├── app.ts
├── clients
├── config
├── controllers
├── handlers
├── helpers
├── middleware
├── models
├── routes
├── service
├── types
└── utils
2. 用于分布式执行的无服务器处理程序
该项目围绕 AWS Lambda 进行设计,导出并构建了不同的处理程序,以便无缝执行计划的操作。这些处理程序确保各种任务独立处理,从而提高容错能力和可扩展性。
- 动作处理程序:管理创建、调度、检索、更新、删除和处理已调度的动作。这使所有与动作相关的逻辑保持集中,易于修改,而不会影响系统的其他部分。
- 延迟操作处理程序:专门处理需要稍后启动的操作。这种分离确保延迟操作能够被有效地调度和处理,而不会干扰实时执行。
- 即时操作处理程序:触发必须在 2 分钟内启动的操作,使用DynamoDB Streams检测更改并立即启动执行。这确保了紧急任务的及时处理。
- 履行处理程序:通过与相应的履行服务交互,确保计划的操作能够正确执行。此设计允许履行逻辑独立于操作调度进行演进。
├── handlers
│ ├── fulfillment.ts
│ ├── initiate-scheduled-actions.ts
│ ├── initiate-stream-actions.ts
│ └── process.ts
├── http-apis.ts
3.通过关注点分离实现可维护性
项目中的每个模块都是独立的,这意味着对一个组件的更改不会直接影响其他组件。这降低了破坏现有功能的风险并简化了调试。
- 控制器处理请求路由和执行逻辑。
- 服务管理业务逻辑和数据交互。
- 客户端与数据库、队列和 API 等外部服务进行交互。
- 模型定义了整个系统使用的数据结构。
- 中间件确保请求通过验证和身份验证层。
- 实用程序为日志记录、错误处理和重试提供了可重复使用的辅助函数。
./src
├── clients
├── controllers
├── middleware
├── models
├── service
└── utils
4.易于扩展
采用模块化设计,无需修改核心组件即可添加新功能。例如:
- 可以通过在履行服务中添加新操作来引入新类型的计划操作,而无需修改现有的调度或排队逻辑。
- 通过扩展客户端模块可以实现新的外部服务集成,确保与第三方系统的无缝通信。
延迟执行:确保及时执行
系统通过定期执行有效地处理预定的操作,确保所有待处理的操作都在正确的时间执行而不会延迟。
定期执行计划操作
- Lambda 函数定期扫描数据库,查找状态为 PENDING且执行时间已到的操作。
- Amazon EventBridge充当调度程序,每 5 分钟触发一次此 Lambda 函数,以确保按时执行操作。
- 该函数将这些待处理的操作排入 Amazon SQS 队列,以确保可靠且可扩展的执行管道。
为什么这种方法有效
- 高效的批处理确保可以同时执行多个操作。
- 通过将执行与SQS解耦,可保持可扩展性,防止系统过载。队列对于处理下游系统的负载至关重要。
- 状态管理:操作遵循生命周期(
PENDING → IN_PROGRESS → COMPLETED/FAILED/NO_ACTION
),每个状态都保存在数据库中以便跟踪和恢复。 - 执行处理:成功的执行标记为COMPLETED,失败的执行标记为FAILED,重复操作在重置为PENDING之前更新其执行剩余部分。
- 自动重试:失败的操作将使用指数退避算法进行重试。如果重试次数超过限制,操作将保持失败状态,直到手动重置为止。
- 幂等性和数据完整性:执行余数防止重复执行,无效操作(例如负余数)被阻止。
- 可观察性:元数据存储日志、执行时间戳、API 响应和失败原因,以便于调试。
立即执行:使用 DynamoDB 流处理时间敏感操作
某些计划操作如果执行时间在创建后2 分钟内,则需要立即执行。为了高效处理这些问题,系统利用DynamoDB Streams和 AWS Lambda 进行实时处理。
1. 立即执行的工作原理
- 当插入或修改新操作时, DynamoDB Streams会检测数据库中的变化。
- Lambda函数监听这些变化,处理新操作,并确定它们是否需要立即执行。
- 如果某项操作计划在 2 分钟内执行,Lambda 函数会将其排入 Amazon SQS 队列以供执行。
2. 处理逻辑分解
监听 DynamoDB 流事件
每当在 DynamoDB 中插入或修改initiateProcessFromDynamoStream
新记录时,就会触发该函数。
export const initiateProcessFromDynamoStream = async (
event: DynamoDBStreamEvent,
): Promise<void> => {
try {
const { Records } = event;
if (!Records || Records.length === 0) {
console.log("No records to process in DynamoDB stream event.");
return;
}
console.log(`${Records.length} records received.`);
- 该函数检查事件中是否有新记录。
- 如果不存在记录,该函数将提前退出。
处理每条记录
该函数循环遍历每条记录,提取其详细信息,并确定是否需要处理。
const processingPromises = Records.map(async (record: DynamoDBRecord) => {
const { eventName, dynamodb } = record;
if (!dynamodb?.NewImage) {
console.log("Skipping record: Missing NewImage.");
return Promise.resolve();
}
const cleanedImage = unmarshall(dynamodb.NewImage as Record<string, any>);
console.log(
"Cleaned NewImage object:",
JSON.stringify(cleanedImage, null, 2),
);
if (!["INSERT", "MODIFY"].includes(eventName || "")) {
console.log(`Skipping record with eventName ${eventName}.`);
return Promise.resolve();
}
console.log(`Processing record with eventName: ${eventName}`);
- 该函数从 DynamoDB 流中提取新插入或修改的数据。
- 它过滤掉不相关的记录(即没有
NewImage
或不是新插入/修改的记录)。
检查是否立即执行
然后,该函数通过计算时间差来检查该操作是否需要立即执行。
const { status, retryCount, id, executionTime } = cleanedImage;
// Check buffer time logic
const currentTime = Date.now();
const timeUntilExecution = executionTime - currentTime;
if (timeUntilExecution > TWO_MINUTES_IN_MS) {
console.log(
`Skipping record with id ${id}: Execution time is outside the 2-minute buffer window.`,
);
return Promise.resolve();
}
- 将当前时间与操作的执行时间进行比较。
- 如果距离该操作超过 2 分钟,则会跳过该操作(稍后将通过定期执行来获取该操作)。
- 如果该操作需要立即执行,它将继续处理。
确保有效状态和重试限制
if (!status || ![STATUSES.PENDING].includes(status)) {
console.log("Skipping record: Missing or invalid status.");
return Promise.resolve();
}
if (retryCount !== undefined && retryCount > CONSTANTS.MAX_RETRY) {
console.log(
`Skipping record with retryCount exceeding limit: ${retryCount}`,
);
return Promise.resolve();
}
if (!id) {
console.log("Skipping record: Missing id.");
return Promise.resolve();
}
- 确保操作在处理之前具有有效
PENDING
状态。 - 检查是否已超出重试限制,以防止无限重试。
- 在将操作发送到队列之前,确保该操作具有有效的 ID 。
将操作发送到 SQS 执行
try {
await sendMessage(cleanedImage);
} catch (error: any) {
await fail(id, `Failed to add action to the queue: ${error.message}`);
}
- 如果该操作符合立即执行的条件,则会将其发送到 SQS,在那里由履行服务进行处理。
- 如果SQS 失败,则该操作将被标记为失败并记录下来以供调试。
3. 为什么这种方法可靠
- 实时执行: 2 分钟内安排的操作立即执行,而无需等待定期轮询。
- 自动过滤:计划稍后执行的操作将被跳过并由 EventBridge 在适当的时间处理。
- 错误处理:如果某个操作无法在 SQS 中排队,则会将其标记为FAILED,而不是丢失。
- 可扩展性: Lambda 函数可以同时处理多个事件,确保不会延迟任何操作。
重复执行:管理重复操作
某些计划操作需要以固定间隔执行多次。系统使用三个关键字段来处理重复执行:
- 重复– 指示该操作是否应运行多次。
- executionRemainder – 跟踪该操作还应执行多少次。
- 频率——定义执行之间的时间间隔。
1.处理重复执行
该函数complete(id, notes)
负责管理操作的完成情况。如果某个操作设置为 repeat,它会更新执行时间并跟踪剩余的执行次数。
扣除执行余额
const newExecutionRemainder = repeat && executionRemainder > 0 ? executionRemainder - 1 : 0;
if (newExecutionRemainder < 0) {
throw new Error("Execution remainder cannot be negative.");
}
- 如果动作重复,则执行余数减少 1。
- 如果余数小于 0,则会引发错误以防止出现意外行为。
2. 完成最终执行
if (repeat && newExecutionRemainder === 0) {
await update(id, {
status: STATUSES.COMPLETED,
ttl: calculateTTL(),
executionRemainder: 0,
metadata: {
...metadata,
executionResponses: [...(metadata?.executionResponses || []), notes],
},
});
console.log("Final execution completed successfully:", id);
return;
}
- 如果没有剩余的执行,则该操作将被标记为COMPLETED。
- TTL (生存时间)设置为两周后删除记录。
- 执行元数据已更新,以便于跟踪和观察。
3. 安排下一次执行
如果该操作仍有剩余的执行,则该函数安排下一次执行。
if (repeat && newExecutionRemainder > 0 && frequency) {
const frequencyInMs = getFrequencyInMilliseconds(frequency);
if (!frequencyInMs) {
throw new Error(`Invalid frequency: ${frequency}`);
}
await update(id, {
status: STATUSES.PENDING,
executionTime: executionTime + frequencyInMs,
executionRemainder: newExecutionRemainder,
metadata: {
...metadata,
executionResponses: [...(metadata?.executionResponses || []), notes],
},
});
console.log("Recurring action updated successfully:", id);
return;
}
- 通过添加频率字段中的间隔来更新执行时间。
- 该操作状态设置为“PENDING”,以便可以再次执行。
- 元数据已更新以记录执行历史记录。
4. 频率转换
系统将预定义的频率转换为毫秒来更新执行时间。
const frequencyDurations: Record<string, number> = {
TEN_MINS: 10 * 60 * 1000,
HOURLY: 60 * 60 * 1000,
DAILY: 24 * 60 * 60 * 1000,
WEEKLY: 7 * 24 * 60 * 60 * 1000,
MONTHLY: 30 * 24 * 60 * 60 * 1000, //Not accurate and for demonstration purposes
};
这允许根据预定义的间隔进行灵活调度。
5.确保幂等性和数据完整性
对于非重复动作,系统确保只执行一次。
await update(id, {
status: STATUSES.COMPLETED,
ttl: calculateTTL(),
metadata: { ...metadata, notes },
});
console.log("Action completed successfully with TTL:", id);
- 不重复的操作将立即标记为已完成。
- TTL 确保数据在删除之前保留一段有限的时间。
为什么这种方法有效
- 自动重新安排——系统自动设置下一次执行时间。
- 防止过度执行——当余数达到零时执行停止。
- 高效跟踪——每次执行都会更新元数据,以便进行调试和观察。
- 数据完整性——确保频率值有效并且执行余数正确减少。
动作处理:通过重复数据删除确保可靠执行
系统使用Amazon SQS FIFO 队列或基于 Redis 的重复数据删除处理计划操作,以确保每个操作只执行一次,防止重复处理。
1. 使用 SQS FIFO 处理动作
- 消息被发送到Amazon SQS FIFO 队列,确保按照先进先出的顺序处理操作。
- FIFO 队列保证重复数据删除,防止同一条消息被多次处理。
- 这种方法非常适合严格排序和一次性处理。
2. 使用 Redis 进行重复数据删除
如果不使用 FIFO 队列,系统会利用Redis来管理重复数据删除,然后再将消息发送到标准 SQS 队列。
Redis 重复数据删除的工作原理
- 每条消息都会根据以下情况分配一个重复数据删除 ID:
- 动作的唯一 ID
- 行动状态
- 重试次数(如果适用)
const deduplicationId = `${messageBody.id}-${messageBody.status}-${messageBody.retryCount || 0}`;
const redisKey = `sqs-deduplication:${deduplicationId}`;
- 在向 SQS 发送消息之前,Redis会检查重复数据删除 ID 是否存在。
const redisCheck = await RedisClient.get(redisKey);
if (redisCheck.success && redisCheck.data) {
console.log(
`Duplicate message detected. Skipping send for ID: ${deduplicationId}`,
);
return;
}
- 如果检测到重复,则不会发送消息,避免冗余处理。
3.向 SQS 发送消息
如果该操作不是重复的,则将其发送到SQS 队列进行处理。
const command = new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(messageBody),
});
await executeSQSCommand(command);
console.log(`Message sent successfully to ${queueUrl}`);
- 该系统确保消息传递时不会出现不必要的重复。
- 操作进入队列后进入执行阶段。
4. 将重复数据删除数据存储在 Redis 中
发送消息后,去重ID会保存在Redis中,TTL为5分钟,保证临时去重。
await RedisClient.set(redisKey, true, 300); // 300 seconds = 5 minutes
- 较短的到期时间可确保重试的操作仍可在需要时得到处理。
- Redis 有助于管理临时重复数据删除,而不会影响长期操作的执行。
5. 优雅地处理错误
如果向 SQS 发送消息失败,则会根据故障类型处理错误:
if (error.name === "TimeoutError") {
throw new AppError({
...CommonErrors.REQUEST_TIMEOUT,
message: "Timeout occurred while sending the message to SQS.",
metadata: { queueUrl, messageBody, error: error.message },
});
}
throw new AppError({
...CommonErrors.INTERNAL_SERVER_ERROR,
message: "Failed to send message to SQS.",
metadata: { queueUrl, messageBody, error: error.message },
});
- 超时会触发特定的重试策略。
- 其他故障记录元数据以帮助诊断问题。
为什么这种方法有效
- FIFO 队列可确保对时间敏感的任务进行严格排序和重复数据删除。
- 当 FIFO 队列不可用时,Redis 重复数据删除可防止不必要的重复处理。
- 错误处理机制确保在必要时重试消息。
行动履行:处理来自 SQS 的计划行动
一旦计划操作到达执行时间,它们将由从Amazon SQS读取消息的Lambda 函数进行处理。该函数确保操作正确执行,相应地更新其状态,并在需要时处理错误或重试。
1.来自SQS的处理操作
该fulfill
函数监听SQS 事件,其中每条记录代表一个需要执行的预定操作。
export const fulfill = async (event: { Records: SQSRecord[] }): Promise<void> => {
const { Records } = event;
if (!Records || Records.length === 0) {
console.log("No records to process in SQS event.");
return;
}
- 该函数检查是否有新的记录需要处理。
- 如果不存在记录,它会提前退出。
2.确保操作正确执行
对于队列中的每个操作:
- 如果操作超出了最大重试次数,则将其标记为 FAILED。
if (retryCount >= CONSTANTS.MAX_RETRY) {
await handleFailure(
id,
receiptHandle,
metadata?.retryReason || "Exceeded maximum retry attempts",
);
continue;
}
- 如果该操作是第一次执行,则其状态设置为IN_PROGRESS。
if (retryCount === 0) {
await start(id);
}
3.处理不同类型的动作
尽管调度程序不允许在没有有效履行服务的情况下安排操作,但有一个防御机制(NO_ACTION)来处理数据库中手动更改或损坏操作的情况。
- 如果无法识别该操作类型,则将其标记为NO_ACTION并从队列中删除。
if (!Object.values(ACTIONS).includes(scheduledAction?.action as Actions)) {
await noAction(id);
await deleteMessage(receiptHandle);
continue;
}
- 如果操作有效,则根据其类型进行处理。
处理一般任务
case ACTIONS.EXECUTE_TASK:
result = await taskExecutionService.performTask(scheduledAction.data);
await complete(id, result);
break;
- 调用外部服务来执行通用任务(例如,处理用户请求)。
- 一旦完成,就将操作标记为“已完成” 。
处理通知
case ACTIONS.SEND_ALERT: {
const { recipient, messageType, ...messageData } = scheduledAction.data;
const processedMessageData = processMessage(messageData);
result = await notificationService.send(
recipient,
messageType,
processedMessageData,
);
await complete(id, result);
break;
}
- 使用通知服务发送警报或通知。
- 执行后将操作标记为已完成。
4.处理失败和重试
如果某个操作失败,系统会应用指数退避算法并重试执行,然后将其标记为永久失败。
将操作标记为失败
const handleFailure = async (id: string, receiptHandle: string, reason: string): Promise<void> => {
console.error(`Action with id: ${id} failed after maximum retries. Reason: ${reason}`);
await fail(id, reason);
await deleteMessage(receiptHandle);
};
- 如果操作超出重试限制,则会被标记为失败。
- 该消息从队列中删除以防止进一步处理。
使用退避机制重试操作
const handleProcessingError = async (
id: string,
receiptHandle: string,
retryCount: number,
error: any,
): Promise<void> => {
console.error(`Error processing message with id: ${id}. Error: ${error.message || error}`);
await applyExponentialBackoff(retryCount, id);
const actionMarkedForRetry = await retry(
id,
error instanceof AppError
? JSON.stringify(error?.metadata) || error?.message || error?.code || error?.name
: `Processing error : ${error}`,
);
await sendMessage(actionMarkedForRetry);
await deleteMessage(receiptHandle);
};
- 如果发生错误,系统将应用指数退避并增加重试计数。
- 失败的操作将重新排队到 SQS 中以进行重试。
5. 最终执行
- 一旦操作成功完成,它将从 SQS 中删除。
await deleteMessage(receiptHandle);
console.log(`Message with id: ${id} processed successfully.`);
- 如果处理了 SQS 事件的所有记录,则会打印摘要日志。
console.log(`${Records.length} records from the SQS event have been processed.`);
为什么这种方法有效
- 确保操作始终被执行——每个操作在永久失败之前都会经过退避重试。
- 处理不同的操作类型- 支持通知、任务、订阅更新和其他计划作业。
- 防止重复执行——使用SQS FIFO 或 Redis 重复数据删除来避免重复处理。
- 可靠的状态管理——使用IN_PROGRESS、COMPLETED、FAILED 或 NO_ACTION状态更新数据库。
- 防御性处理– NO_ACTION是一种保护措施,以防数据库中的操作被手动更改。
表示层:公开可观察性端点
表示层由一个充当 API 的Lambda 函数组成,并通过Amazon API Gateway公开 HTTP 终端节点。这些终端节点允许用户观察、管理和与计划操作进行交互,从而确保实时监控和控制。
1. 通过 API 网关公开 HTTP 端点
使用AWS Lambda和API Gateway构建无服务器 API,提供与计划操作相关的关键功能的访问。
-
按状态和计数获取操作
- 获取按当前状态分组的操作(例如,待定、已完成、失败)。
- 提供计数摘要来跟踪执行趋势。
-
发起失败的操作
- 允许用户手动重试失败的操作。
- 确保失败的作业可以重新处理,而无需等待自动重试周期。
-
删除操作
- 提供一个端点来删除旧的或不必要的操作。
- 通过管理过期记录来帮助维护干净的数据库。
2. 与监控仪表板集成
这些端点公开的数据可以在仪表板上可视化,以实现实时观察。
- 按状态显示操作计数以跟踪性能。
- 允许用户通过界面手动重试或删除操作。
- 提供有关系统健康和执行可靠性的见解。
构建计划行动系统的挑战
开发可靠且可扩展的计划行动系统面临着一些需要认真解决的挑战。
-
竞争条件
- 当多个进程尝试同时更新或执行相同的操作时,可能会出现不一致的情况。
- 适当的锁定机制、重复数据删除和 FIFO 队列有助于防止重复执行。
-
在周期的每个点进行负载测试
- 必须在高负载下测试系统,以确保调度、执行、重试和履行能够正确扩展。
- 测试包括数据库性能、SQS 消息处理、Lambda 执行限制和 API 响应时间。
-
作业被流和调度程序接收
- 执行后 2 分钟内安排的操作由DynamoDB Streams处理,而其他操作则依赖于EventBridge。
- 如果没有适当的协调,可能会发生重复执行。确保操作在待处理、进行中和已完成状态之间正确转换可以避免此问题。
-
了解工具的局限性
- 并发处理: AWS Lambda会自动扩展,但高并发性可能会导致处理受限和延迟。
- Lambda 运行时限制:由于Lambda 具有最大执行时间,因此必须将长时间运行的任务分解为更小的执行或卸载到工作服务。
未来的改进
-
用于实时通知的 Webhook
- 实施webhook将允许安排操作的外部服务在操作执行、失败或重试时接收实时更新。
- 这减少了轮询的需要并提高了系统响应能力。
-
错过行动的处理程序
- 专用处理程序用于检测和处理仍处于PENDING状态但具有过去执行时间的操作。
- 这可确保不会因系统故障、延迟或扩展问题而永久错过任何预定的操作。
这些改进将增强可靠性、可观察性和与外部系统的集成,使调度系统更加健壮。
结论:调度、扩展和健全性
构建一个可靠、可扩展且容错的定时操作系统,不仅仅是设置一个 cron 作业并期盼最佳结果,更在于将弹性融入到流程的每个步骤中。从调度和执行到重试和可观察性,每个组件都必须协同工作,以确保不会丢失任何操作、不会遗漏任何通知,也不会出现订阅无法管理的情况。
在此过程中,我们解决了以下问题:
✅使用DynamoDB Streams、EventBridge 和 SQS进行动态调度。✅ 结合使用即时操作和延迟操作 进行精确执行。✅通过重复数据删除、重试和指数退避实现可靠处理。✅利用无服务器架构处理高 负载实现可扩展性。✅使用 API 进行 可观察性监控、重试和删除计划操作。
当然,每个系统都有其怪癖和挑战- 竞争条件,工具限制和意外故障 - 但通过正确的设计模式,防御性编码和未来的改进(如webhook和错过的操作恢复),该系统可以发展成为更强大,更智能,更自主的调度程序。
归根结底,自动化是为了让生活更轻松——无论是管理用户订阅、发送通知,还是处理时效性交易。虽然计算机永不休眠,但我们人类却需要休眠,因此,设计一个能够在凌晨 3 点叫醒我们之前自行处理问题的系统总是值得的。
所以,让我们一起构建一个在我们无法运作时依然运作的系统吧! 🎉
鏂囩珷鏉yu簮锛�https://dev.to/joojodontoh/building-a-scalable-reliable-and-cost-effective-event-scheduler-for-asynchronous-jobs-2ac3