为异步作业构建可扩展、可靠且经济高效的事件调度程序

2025-06-11

为异步作业构建可扩展、可靠且经济高效的事件调度程序

介绍

欢迎回到我的博客!😁 在这里,我会和自己——也希望和你——聊聊我在工作中解决的工程问题。我这样做主要是因为寻找解决方案让我感到兴奋。我不断探索效率低下、瓶颈和挑战,这段经历让我最终解决了软件工程中一个常见却又至关重要的问题。

这个问题在于需要异步执行操作——通常需要精确的时间,有时甚至需要重复执行。遵循我解决问题的核心方法(跨越空间和时间),我决定构建一个解决方案,它不仅针对单个操作,而且可扩展到各种用例。无论是发送通知、处理交易还是触发系统工作流,许多任务都需要按计划执行。如果没有强大的调度机制,高效地处理这些任务很快就会变得复杂、不可靠且成本高昂。

为了解决这个问题,我着手构建一个可扩展、可靠且经济高效的事件调度程序——可以无缝管理延迟、即时和重复的操作。

在本文中,我将引导您了解:

  • 导致需要事件调度程序的问题
  • 理想解决方案的功能性和非功能性需求
  • 实施背后的系统设计和架构决策

最后,您将清晰地了解如何构建一个无服务器调度操作系统,以确保准确性、持久性和可扩展性,同时控制成本。让我们开始吧!

动图

问题:管理日历周期内的订阅变更

订阅管理面临着独特的挑战,尤其是在处理取消或降级时😭。用户可以在其计费周期内的任何时间请求这些更改,但由于订阅的预付费性质,此类修改只能在周期结束时生效。这种延迟带来了对异步执行的需求——一个可以立即记录这些请求,但将其执行推迟到适当时间的系统。

解决方案:适当的调度机制

如果没有合适的调度机制,高效管理这些延迟操作将变得非常复杂。系统必须确保每个请求都在正确的时间执行,同时避免操作遗漏或重复。此外,频繁执行的操作(例如批量处理多个计划变更)必须能够处理,而不会造成系统过载。为了解决这个问题,我们需要一个可靠、可扩展且经济高效的调度程序,能够无缝处理延迟和重复执行

图片描述


功能需求:定义核心能力

强大且可扩展的计划操作系统必须能够有效地安排、执行、更新、监控和重试操作,同时确保可靠性和灵活性。

1. 安排和创建动作

系统必须允许用户通过以下方式安排操作:

  • 操作类型、执行时间、执行数据和元数据是必填字段。
  • 可选字段,如重复、频率和执行剩余。
  • 早期验证以确保行动符合其履行要求。

2. 更新或删除操作

用户可以在操作锁定前(执行前 2 分钟)更新或删除操作。锁定后,不允许任何外部更改。

3. 动作状态管理

每个操作都必须具有内部管理的状态,以反映其执行进度。状态转换和结果必须记录在元数据中以便跟踪。

4. 行动实现图

每个操作都必须映射到负责其执行的特定履行服务。没有匹配履行服务的操作必须进行标记,以防止出现执行错误。

5. 重试失败的操作

失败的操作必须使用指数退避算法重试,以处理临时故障。超过最大重试限制的操作必须进行标记,以便进行手动干预。

6. 处理立即行动、延迟行动和重复行动

系统必须区分立即操作和延迟操作,以确保及时执行:

  • 即时操作(2分钟内执行)必须实时处理,且无调度延迟。
  • 延迟操作(2 分钟后执行)必须在正确的时间安排和处理。
  • 重复动作必须按照规定的次数和频率进行

非功能性需求(NFR):确保系统可靠且可扩展

计划行动系统必须满足关键的NFR,以保证可靠性、可扩展性、安全性和可维护性。

1.可靠性和耐用性

  • 操作必须正确且按时执行(±2 分钟)。
  • 重复操作必须按照正确的频率严格按照计划执行。
  • 失败的操作必须以指数退避算法重试,非重复操作只能执行一次

2.可扩展性

  • 系统必须动态扩展以处理高请求负载。
  • 服务器架构确保成本效益和灵活性。
  • 基于队列的方法例如 AWS SQS)必须调节执行频率以防止下游服务过载。

3. 可用性

  • 系统必须始终可用,不能冷启动,确保在需要时立即执行。无服务器架构以合理的成本支持这一点。

4. 安全

  • 基于签名的验证必须确保请求的安全并防止未经授权的执行。

5.可维护性

  • 该系统必须是模块化的、封装的、并单个存储库内组织。
  • 必须编纂基础设施和数据库索引规则。
  • 必须使用类型化语言来获得更好的可靠性。
  • 必须使用加密的环境变量来启用本地测试。
  • 启动脚本可以自动化包安装和环境设置。
  • 全面的测试必须确保安全的变更和集成。

6.可观察性

  • API 端点必须公开:
    • 所有预定的行动
    • 按状态过滤的操作
    • 失败操作的重试功能
  • 集中式日志系统必须持续跟踪执行问题。

工具:为计划操作系统提供支持

基础设施图

AWS Lambda:用于执行的无服务器计算

  • 无需管理服务器即可实现事件驱动的执行。
  • 处理动作调度和验证。
  • 使用实时事件流处理即时操作。
  • 在预定的时间执行延迟操作。
  • 根据操作类型管理履行任务。

Amazon EventBridge:管理计划执行

  • 充当延迟操作的调度程序。
  • 每 5 分钟轮询一次到期待处理操作并将其排入队列进行处理。
  • 确保执行在预定时间的±2分钟内进行。

Amazon SQS:排队操作以实现可扩展性

  • 通过异步处理计划操作来解耦执行工作负载。
  • 控制履行请求频率以防止系统过载。
  • 使用 FIFO(先进先出)处理来维护执行顺序并防止重复执行。

Amazon DynamoDB:存储计划操作

  • 用作存储计划操作的主要数据库。
  • 提供快速读/写操作以处理高工作负载。
  • 存储用于跟踪执行状态、重试和结果的元数据。
  • 使用 DynamoDB Streams 触发立即执行。

Amazon API Gateway:公开管理端点

  • 提供用于创建、更新和删除计划操作的 HTTP 端点。
  • 公开监控端点以按状态检索操作并重试失败的操作。
  • 通过身份验证和授权机制确保安全访问。

系统设计:计划操作的数据库模式

场地 描述
id 每个计划操作的唯一标识符。
data 存储执行特定的详细信息。
action 定义要执行的操作类型。
executionTime 指定操作应运行的时间。
repeat 指示是否应重复该操作。
frequency 定义重复操作的间隔。
executionRemainder 跟踪剩余的执行次数。
status 执行状态(“PENDING”、“IN_PROGRESS”、“COMPLETED”、“FAILED”)。
createdAt 创建操作的时间戳。
updatedAt 最后修改时间戳。
retryCount 计算执行重试失败的次数。
metadata 存储日志和其他执行细节。

示例:计划通知操作

{
    "data": {
        "mobile": "60123456789",
        "subject": "Test",
        "name": "Joojo",
        "templateType": "USER_LATE_PAYMENT_NOTIFICATION",
        "notificationType": "SMS"
    },
    "repeat": true,
    "frequency": "DAILY",
    "executionRemainder": 5,
    "action": "SEND_NOTIFICATION",
    "executionTime": 1736930117120
}
Enter fullscreen mode Exit fullscreen mode

项目结构

我采用了分层模块化方法,以实现可维护性、可扩展性和易于更改。很多时候,不同的团队可能希望扩展服务中的变更,而不会引入意外的副作用。我尝试通过将组件组织到不同的模块中来实现这一点。让我们深入探讨一下

1. 模块化设计的单一应用程序

整个系统构建为一个单一的应用程序,但采用模块化结构,将关注点分离。每个模块负责系统的特定方面,使代码库更易于导航和修改。

./src
├── app.ts
├── clients
├── config
├── controllers
├── handlers
├── helpers
├── middleware
├── models
├── routes
├── service
├── types
└── utils
Enter fullscreen mode Exit fullscreen mode
2. 用于分布式执行的无服务器处理程序

该项目围绕 AWS Lambda 进行设计,导出并构建了不同的处理程序,以便无缝执行计划的操作。这些处理程序确保各种任务独立处理,从而提高容错能力和可扩展性。

  • 动作处理程序:管理创建、调度、检索、更新、删除和处理已调度的动作。这使所有与动作相关的逻辑保持集中,易于修改,而不会影响系统的其他部分。
  • 延迟操作处理程序:专门处理需要稍后启动的操作。这种分离确保延迟操作能够被有效地调度和处理,而不会干扰实时执行。
  • 即时操作处理程序:触发必须在 2 分钟内启动的操作,使用DynamoDB Streams检测更改并立即启动执行。这确保了紧急任务的及时处理。
  • 履行处理程序:通过与相应的履行服务交互,确保计划的操作能够正确执行。此设计允许履行逻辑独立于操作调度进行演进。
├── handlers
│   ├── fulfillment.ts
│   ├── initiate-scheduled-actions.ts
│   ├── initiate-stream-actions.ts
│   └── process.ts
       ├── http-apis.ts
Enter fullscreen mode Exit fullscreen mode
3.通过关注点分离实现可维护性

项目中的每个模块都是独立的,这意味着对一个组件的更改不会直接影响其他组件。这降低了破坏现有功能的风险并简化了调试。

  • 控制器处理请求路由和执行逻辑。
  • 服务管理业务逻辑和数据交互。
  • 客户端与数据库、队列和 API 等外部服务进行交互。
  • 模型定义了整个系统使用的数据结构。
  • 中间件确保请求通过验证和身份验证层。
  • 实用程序为日志记录、错误处理和重试提供了可重复使用的辅助函数。
./src
├── clients
├── controllers
├── middleware
├── models
├── service
└── utils
Enter fullscreen mode Exit fullscreen mode
4.易于扩展

采用模块化设计,无需修改核心组件即可添加新功能。例如:

  • 可以通过在履行服务中添加新操作来引入新类型的计划操作,而无需修改现有的调度或排队逻辑。
  • 通过扩展客户端模块可以实现新的外部服务集成,确保与第三方系统的无缝通信。

图片描述


延迟执行:确保及时执行

系统通过定期执行有效地处理预定的操作,确保所有待处理的操作都在正确的时间执行而不会延迟。

定期执行计划操作

  • Lambda 函数定期扫描数据库,查找状态为 PENDING执行时间已到的操作。
  • Amazon EventBridge充当调度程序,每 5 分钟触发一次此 Lambda 函数,以确保按时执行操作。
  • 该函数将这些待处理的操作排入 Amazon SQS 队列,以确保可靠且可扩展的执行管道。

为什么这种方法有效

  • 高效的批处理确保可以同时执行多个操作。
  • 通过将执行与SQS解耦,可保持可扩展性,防止系统过载。队列对于处理下游系统的负载至关重要。
  • 状态管理:操作遵循生命周期(PENDING → IN_PROGRESS → COMPLETED/FAILED/NO_ACTION),每个状态都保存在数据库中以便跟踪和恢复。
  • 执行处理:成功的执行标记为COMPLETED,失败的执行标记为FAILED,重复操作在重置为PENDING之前更新其执行剩余部分。
  • 自动重试:失败的操作将使用指数退避算法进行重试。如果重试次数超过限制,操作将保持失败状态,直到手动重置为止。
  • 幂等性和数据完整性:执行余数防止重复执行,无效操作(例如负余数)被阻止。
  • 可观察性:元数据存储日志、执行时间戳、API 响应和失败原因,以便于调试。

立即执行:使用 DynamoDB 流处理时间敏感操作

某些计划操作如果执行时间在创建后2 分钟内,则需要立即执行。为了高效处理这些问题,系统利用DynamoDB Streams和 AWS Lambda 进行实时处理。


1. 立即执行的工作原理

  • 当插入或修改新操作时, DynamoDB Streams会检测数据库中的变化。
  • Lambda函数监听这些变化,处理新操作,并确定它们是否需要立即执行。
  • 如果某项操作计划在 2 分钟内执行,Lambda 函数会将其排入 Amazon SQS 队列以供执行。

2. 处理逻辑分解

监听 DynamoDB 流事件

每当在 DynamoDB 中插入修改initiateProcessFromDynamoStream新记录时,就会触发该函数。

export const initiateProcessFromDynamoStream = async (
  event: DynamoDBStreamEvent,
): Promise<void> => {
  try {
    const { Records } = event;
    if (!Records || Records.length === 0) {
      console.log("No records to process in DynamoDB stream event.");
      return;
    }
    console.log(`${Records.length} records received.`);
Enter fullscreen mode Exit fullscreen mode
  • 该函数检查事件中是否有新记录。
  • 如果不存在记录,该函数将提前退出。

处理每条记录

该函数循环遍历每条记录,提取其详细信息,并确定是否需要处理。

const processingPromises = Records.map(async (record: DynamoDBRecord) => {
  const { eventName, dynamodb } = record;

  if (!dynamodb?.NewImage) {
    console.log("Skipping record: Missing NewImage.");
    return Promise.resolve();
  }

  const cleanedImage = unmarshall(dynamodb.NewImage as Record<string, any>);
  console.log(
    "Cleaned NewImage object:",
    JSON.stringify(cleanedImage, null, 2),
  );

  if (!["INSERT", "MODIFY"].includes(eventName || "")) {
    console.log(`Skipping record with eventName ${eventName}.`);
    return Promise.resolve();
  }
  console.log(`Processing record with eventName: ${eventName}`);
Enter fullscreen mode Exit fullscreen mode
  • 该函数从 DynamoDB 流中提取新插入或修改的数据。
  • 过滤掉不相关的记录(即没有NewImage或不是新插入/修改的记录)。

检查是否立即执行

然后,该函数通过计算时间差来检查该操作是否需要立即执行。

const { status, retryCount, id, executionTime } = cleanedImage;

// Check buffer time logic
const currentTime = Date.now();
const timeUntilExecution = executionTime - currentTime;

if (timeUntilExecution > TWO_MINUTES_IN_MS) {
  console.log(
    `Skipping record with id ${id}: Execution time is outside the 2-minute buffer window.`,
  );
  return Promise.resolve();
}
Enter fullscreen mode Exit fullscreen mode
  • 将当前时间操作的执行时间进行比较。
  • 如果距离该操作超过 2 分钟,则会跳过该操作(稍后将通过定期执行来获取该操作)。
  • 如果该操作需要立即执行,它将继续处理。

确保有效状态和重试限制

if (!status || ![STATUSES.PENDING].includes(status)) {
  console.log("Skipping record: Missing or invalid status.");
  return Promise.resolve();
}

if (retryCount !== undefined && retryCount > CONSTANTS.MAX_RETRY) {
  console.log(
    `Skipping record with retryCount exceeding limit: ${retryCount}`,
  );
  return Promise.resolve();
}

if (!id) {
  console.log("Skipping record: Missing id.");
  return Promise.resolve();
}
Enter fullscreen mode Exit fullscreen mode
  • 确保操作在处理之前具有有效PENDING状态。
  • 检查是否已超出重试限制,以防止无限重试。
  • 在将操作发送到队列之前,确保该操作具有有效的 ID 。

将操作发送到 SQS 执行

try {
  await sendMessage(cleanedImage);
} catch (error: any) {
  await fail(id, `Failed to add action to the queue: ${error.message}`);
}
Enter fullscreen mode Exit fullscreen mode
  • 如果该操作符合立即执行的条件,则会将其发送到 SQS,在那里由履行服务进行处理。
  • 如果SQS 失败,则该操作将被标记为失败并记录下来以供调试。

3. 为什么这种方法可靠

  • 实时执行: 2 分钟内安排的操作立即执行,而无需等待定期轮询。
  • 自动过滤:计划稍后执行的操作将被跳过并由 EventBridge 在适当的时间处理。
  • 错误处理:如果某个操作无法在 SQS 中排队,则会将其标记为FAILED,而不是丢失。
  • 可扩展性: Lambda 函数可以同时处理多个事件,确保不会延迟任何操作。

重复执行:管理重复操作

某些计划操作需要以固定间隔执行多次。系统使用三个关键字段来处理重复执行:

  • 重复– 指示该操作是否应运行多次。
  • executionRemainder – 跟踪该操作还应执行多少次。
  • 频率——定义执行之间的时间间隔。

1.处理重复执行

该函数complete(id, notes)负责管理操作的完成情况。如果某个操作设置为 repeat,它会更新执行时间并跟踪剩余的执行次数。

扣除执行余额

const newExecutionRemainder = repeat && executionRemainder > 0 ? executionRemainder - 1 : 0;
if (newExecutionRemainder < 0) {
  throw new Error("Execution remainder cannot be negative.");
}
Enter fullscreen mode Exit fullscreen mode
  • 如果动作重复,则执行余数减少 1
  • 如果余数小于 0,则会引发错误以防止出现意外行为。

2. 完成最终执行

if (repeat && newExecutionRemainder === 0) {
  await update(id, {
    status: STATUSES.COMPLETED,
    ttl: calculateTTL(),
    executionRemainder: 0,
    metadata: {
      ...metadata,
      executionResponses: [...(metadata?.executionResponses || []), notes],
    },
  });

  console.log("Final execution completed successfully:", id);
  return;
}
Enter fullscreen mode Exit fullscreen mode
  • 如果没有剩余的执行,则该操作将被标记为COMPLETED
  • TTL (生存时间)设置为两周后删除记录
  • 执行元数据已更新,以便于跟踪和观察。

3. 安排下一次执行

如果该操作仍有剩余的执行,则该函数安排下一次执行。

if (repeat && newExecutionRemainder > 0 && frequency) {
  const frequencyInMs = getFrequencyInMilliseconds(frequency);
  if (!frequencyInMs) {
    throw new Error(`Invalid frequency: ${frequency}`);
  }

  await update(id, {
    status: STATUSES.PENDING,
    executionTime: executionTime + frequencyInMs,
    executionRemainder: newExecutionRemainder,
    metadata: {
      ...metadata,
      executionResponses: [...(metadata?.executionResponses || []), notes],
    },
  });

  console.log("Recurring action updated successfully:", id);
  return;
}
Enter fullscreen mode Exit fullscreen mode
  • 通过添加频率字段中的间隔来更新执行时间
  • 该操作状态设置为“PENDING”,以便可以再次执行。
  • 元数据已更新以记录执行历史记录。

4. 频率转换

系统将预定义的频率转换为毫秒来更新执行时间。

const frequencyDurations: Record<string, number> = {
  TEN_MINS: 10 * 60 * 1000,
  HOURLY: 60 * 60 * 1000,
  DAILY: 24 * 60 * 60 * 1000,
  WEEKLY: 7 * 24 * 60 * 60 * 1000,
  MONTHLY: 30 * 24 * 60 * 60 * 1000, //Not accurate and for demonstration purposes
};
Enter fullscreen mode Exit fullscreen mode

这允许根据预定义的间隔进行灵活调度。


5.确保幂等性和数据完整性

对于非重复动作,系统确保只执行一次

await update(id, {
  status: STATUSES.COMPLETED,
  ttl: calculateTTL(),
  metadata: { ...metadata, notes },
});
console.log("Action completed successfully with TTL:", id);
Enter fullscreen mode Exit fullscreen mode
  • 不重复的操作将立即标记为已完成
  • TTL 确保数据在删除之前保留一段有限的时间

为什么这种方法有效

  • 自动重新安排——系统自动设置下一次执行时间。
  • 防止过度执行——当余数达到零时执行停止。
  • 高效跟踪——每次执行都会更新元数据,以便进行调试和观察。
  • 数据完整性——确保频率值有效并且执行余数正确减少。

过程

动作处理:通过重复数据删除确保可靠执行

系统使用Amazon SQS FIFO 队列基于 Redis 的重复数据删除处理计划操作,以确保每个操作只执行一次,防止重复处理。


1. 使用 SQS FIFO 处理动作

  • 消息被发送到Amazon SQS FIFO 队列,确保按照先进先出的顺序处理操作
  • FIFO 队列保证重复数据删除,防止同一条消息被多次处理。
  • 这种方法非常适合严格排序和一次性处理

2. 使用 Redis 进行重复数据删除

如果不使用 FIFO 队列,系统会利用Redis来管理重复数据删除,然后再将消息发送到标准 SQS 队列。

Redis 重复数据删除的工作原理
  • 每条消息都会根据以下情况分配一个重复数据删除 ID
    • 动作的唯一 ID
    • 行动状态
    • 重试次数如果适用)
const deduplicationId = `${messageBody.id}-${messageBody.status}-${messageBody.retryCount || 0}`;
const redisKey = `sqs-deduplication:${deduplicationId}`;
Enter fullscreen mode Exit fullscreen mode
  • 在向 SQS 发送消息之前,Redis会检查重复数据删除 ID 是否存在
const redisCheck = await RedisClient.get(redisKey);
if (redisCheck.success && redisCheck.data) {
  console.log(
    `Duplicate message detected. Skipping send for ID: ${deduplicationId}`,
  );
  return;
}
Enter fullscreen mode Exit fullscreen mode
  • 如果检测到重复,则不会发送消息,避免冗余处理。

3.向 SQS 发送消息

如果该操作不是重复的,则将其发送到SQS 队列进行处理。

const command = new SendMessageCommand({
  QueueUrl: queueUrl,
  MessageBody: JSON.stringify(messageBody),
});
await executeSQSCommand(command);
console.log(`Message sent successfully to ${queueUrl}`);
Enter fullscreen mode Exit fullscreen mode
  • 该系统确保消息传递时不会出现不必要的重复
  • 操作进入队列后进入执行阶段。

4. 将重复数据删除数据存储在 Redis 中

发送消息后,去重ID会保存在Redis中,TTL为5分钟,保证临时去重。

await RedisClient.set(redisKey, true, 300); // 300 seconds = 5 minutes
Enter fullscreen mode Exit fullscreen mode
  • 较短的到期时间可确保重试的操作仍可在需要时得到处理。
  • Redis 有助于管理临时重复数据删除,而不会影响长期操作的执行。

5. 优雅地处理错误

如果向 SQS 发送消息失败,则会根据故障类型处理错误:

if (error.name === "TimeoutError") {
  throw new AppError({
    ...CommonErrors.REQUEST_TIMEOUT,
    message: "Timeout occurred while sending the message to SQS.",
    metadata: { queueUrl, messageBody, error: error.message },
  });
}
throw new AppError({
  ...CommonErrors.INTERNAL_SERVER_ERROR,
  message: "Failed to send message to SQS.",
  metadata: { queueUrl, messageBody, error: error.message },
});
Enter fullscreen mode Exit fullscreen mode
  • 超时会触发特定的重试策略。
  • 其他故障记录元数据以帮助诊断问题。

为什么这种方法有效

  • FIFO 队列可确保对时间敏感的任务进行严格排序和重复数据删除。
  • 当 FIFO 队列不可用时,Redis 重复数据删除可防止不必要的重复处理。
  • 错误处理机制确保在必要时重试消息。

行动履行:处理来自 SQS 的计划行动

一旦计划操作到达执行时间,它们将由从Amazon SQS读取消息的Lambda 函数进行处理。该函数确保操作正确执行,相应地更新其状态,并在需要时处理错误或重试。

1.来自SQS的处理操作

fulfill函数监听SQS 事件,其中每条记录代表一个需要执行的预定操作。

export const fulfill = async (event: { Records: SQSRecord[] }): Promise<void> => {
  const { Records } = event;

  if (!Records || Records.length === 0) {
    console.log("No records to process in SQS event.");
    return;
  }
Enter fullscreen mode Exit fullscreen mode
  • 该函数检查是否有新的记录需要处理
  • 如果不存在记录,它会提前退出。

2.确保操作正确执行

对于队列中的每个操作:

  • 如果操作超出了最大重试次数,则将其标记为 FAILED
if (retryCount >= CONSTANTS.MAX_RETRY) {
  await handleFailure(
    id,
    receiptHandle,
    metadata?.retryReason || "Exceeded maximum retry attempts",
  );
  continue;
}
Enter fullscreen mode Exit fullscreen mode
  • 如果该操作是第一次执行,则其状态设置为IN_PROGRESS
if (retryCount === 0) {
  await start(id);
}
Enter fullscreen mode Exit fullscreen mode

3.处理不同类型的动作

尽管调度程序不允许在没有有效履行服务的情况下安排操作,但有一个防御机制(NO_ACTION)来处理数据库中手动更改或损坏操作的情况

  • 如果无法识别该操作类型,则将其标记为NO_ACTION并从队列中删除。
if (!Object.values(ACTIONS).includes(scheduledAction?.action as Actions)) {
  await noAction(id);
  await deleteMessage(receiptHandle);
  continue;
}
Enter fullscreen mode Exit fullscreen mode
  • 如果操作有效,则根据其类型进行处理。

处理一般任务

case ACTIONS.EXECUTE_TASK:
  result = await taskExecutionService.performTask(scheduledAction.data);
  await complete(id, result);
  break;
Enter fullscreen mode Exit fullscreen mode
  • 调用外部服务来执行通用任务(例如,处理用户请求)。
  • 一旦完成,就将操作标记为“已完成” 。

处理通知

case ACTIONS.SEND_ALERT: {
  const { recipient, messageType, ...messageData } = scheduledAction.data;

  const processedMessageData = processMessage(messageData);

  result = await notificationService.send(
    recipient,
    messageType,
    processedMessageData,
  );

  await complete(id, result);
  break;
}
Enter fullscreen mode Exit fullscreen mode
  • 使用通知服务发送警报或通知
  • 执行后将操作标记为已完成

4.处理失败和重试

如果某个操作失败,系统会应用指数退避算法并重试执行,然后将其标记为永久失败。

将操作标记为失败

const handleFailure = async (id: string, receiptHandle: string, reason: string): Promise<void> => {
  console.error(`Action with id: ${id} failed after maximum retries. Reason: ${reason}`);
  await fail(id, reason);
  await deleteMessage(receiptHandle);
};
Enter fullscreen mode Exit fullscreen mode
  • 如果操作超出重试限制,则会被标记为失败
  • 该消息从队列中删除以防止进一步处理。

使用退避机制重试操作

const handleProcessingError = async (
  id: string,
  receiptHandle: string,
  retryCount: number,
  error: any,
): Promise<void> => {
  console.error(`Error processing message with id: ${id}. Error: ${error.message || error}`);

  await applyExponentialBackoff(retryCount, id);
  const actionMarkedForRetry = await retry(
    id,
    error instanceof AppError
      ? JSON.stringify(error?.metadata) || error?.message || error?.code || error?.name
      : `Processing error : ${error}`,
  );
  await sendMessage(actionMarkedForRetry);
  await deleteMessage(receiptHandle);
};
Enter fullscreen mode Exit fullscreen mode
  • 如果发生错误,系统将应用指数退避增加重试计数
  • 失败的操作将重新排队到 SQS 中以进行重试。

5. 最终执行

  • 一旦操作成功完成,它将从 SQS 中删除。
await deleteMessage(receiptHandle);
console.log(`Message with id: ${id} processed successfully.`);
Enter fullscreen mode Exit fullscreen mode
  • 如果处理了 SQS 事件的所有记录,则会打印摘要日志。
console.log(`${Records.length} records from the SQS event have been processed.`);
Enter fullscreen mode Exit fullscreen mode

为什么这种方法有效

  • 确保操作始终被执行——每个操作在永久失败之前都会经过退避重试。
  • 处理不同的操作类型- 支持通知、任务、订阅更新和其他计划作业
  • 防止重复执行——使用SQS FIFO 或 Redis 重复数据删除来避免重复处理。
  • 可靠的状态管理——使用IN_PROGRESS、COMPLETED、FAILED 或 NO_ACTION状态更新数据库
  • 防御性处理NO_ACTION是一种保护措施,以防数据库中的操作被手动更改。

表示层:公开可观察性端点

表示由一个充当 API 的Lambda 函数组成,并通过Amazon API Gateway公开 HTTP 终端节点。这些终端节点允许用户观察、管理和与计划操作进行交互,从而确保实时监控和控制。

1. 通过 API 网关公开 HTTP 端点

使用AWS LambdaAPI Gateway构建无服务器 API提供与计划操作相关的关键功能的访问。

  • 按状态和计数获取操作

    • 获取按当前状态分组的操作(例如,待定、已完成、失败)。
    • 提供计数摘要来跟踪执行趋势。
  • 发起失败的操作

    • 允许用户手动重试失败的操作。
    • 确保失败的作业可以重新处理,而无需等待自动重试周期。
  • 删除操作

    • 提供一个端点来删除旧的或不必要的操作
    • 通过管理过期记录来帮助维护干净的数据库

2. 与监控仪表板集成

这些端点公开的数据可以在仪表板上可视化,以实现实时观察。

图片描述

  • 按状态显示操作计数以跟踪性能。
  • 允许用户通过界面手动重试或删除操作。
  • 提供有关系统健康和执行可靠性的见解

图片描述

构建计划行动系统的挑战

开发可靠且可扩展的计划行动系统面临着一些需要认真解决的挑战。

  • 竞争条件

    • 当多个进程尝试同时更新或执行相同的操作时,可能会出现不一致的情况。
    • 适当的锁定机制、重复数据删除和 FIFO 队列有助于防止重复执行。
  • 在周期的每个点进行负载测试

    • 必须在高负载下测试系统,以确保调度、执行、重试和履行能够正确扩展。
    • 测试包括数据库性能、SQS 消息处理、Lambda 执行限制和 API 响应时间
  • 作业被流和调度程序接收

    • 执行后 2 分钟内安排的操作由DynamoDB Streams处理,而其他操作则依赖于EventBridge
    • 如果没有适当的协调,可能会发生重复执行。确保操作在待处理、进行中和已完成状态之间正确转换可以避免此问题。
  • 了解工具的局限性

    • 并发处理: AWS Lambda会自动扩展,但高并发性可能会导致处理受限和延迟。
    • Lambda 运行时限制:由于Lambda 具有最大执行时间,因此必须将长时间运行的任务分解为更小的执行或卸载到工作服务

未来的改进

  • 用于实时通知的 Webhook

    • 实施webhook将允许安排操作的外部服务在操作执行、失败或重试时接收实时更新
    • 这减少了轮询的需要并提高了系统响应能力。
  • 错过行动的处理程序

    • 专用处理程序用于检测和处理仍处于PENDING状态但具有过去执行时间的操作。
    • 这可确保不会因系统故障、延迟或扩展问题而永久错过任何预定的操作。

这些改进将增强可靠性、可观察性和与外部系统的集成,使调度系统更加健壮。

结论:调度、扩展和健全

构建一个可靠、可扩展且容错的定时操作系统,不仅仅是设置一个 cron 作业并期盼最佳结果,更在于将弹性融入到流程的每个步骤中。从调度和执行到重试和可观察性,每个组件都必须协同工作,以确保不会丢失任何操作、不会遗漏任何通知,也不会出现订阅无法管理的情况。

在此过程中,我们解决了以下问题:

使用DynamoDB Streams、EventBridge 和 SQS进行动态调度。✅ 结合使用即时操作和延迟操作 进行精确执行。✅通过重复数据删除、重试和指数退避实现可靠处理。✅利用无服务器架构处理高 负载实现可扩展性。✅使用 API 进行观察性监控、重试和删除计划操作







当然,每个系统都有其怪癖和挑战- 竞争条件,工具限制和意外故障 - 但通过正确的设计模式,防御性编码和未来的改进(如webhook和错过的操作恢复),该系统可以发展成为更强大,更智能,更自主的调度程序。

归根结底,自动化是为了让生活更轻松——无论是管理用户订阅、发送通知,还是处理时效性交易。虽然计算机永不休眠,但我们人类却需要休眠,因此,设计一个能够在凌晨 3 点叫醒我们之前自行处理问题的系统总是值得的。

所以,让我们一起构建一个在我们无法运作时依然运作的系统吧! 🎉

鏂囩珷鏉yu簮锛�https://dev.to/joojodontoh/building-a-scalable-reliable-and-cost-effective-event-scheduler-for-asynchronous-jobs-2ac3
PREV
使用 Redux 构建 React 应用时不要做的 12 件事
NEXT
现实世界中前端 JavaScript 的数据结构(附 React 代码示例)