使用会话在 Azure Functions 中进行有序队列处理

2025-06-08

使用会话在 Azure Functions 中进行有序队列处理

我们来聊聊排序。这是我最喜欢讨论的话题之一,之前我曾在博客中详细讨论过。之前,Azure Functions 中的排序处理只能通过 Azure Event Hubs 等事件流来实现,但今天我想展示如何同时保持 Service Bus 队列和主题的顺序。

表面上看,这似乎很简单:我希望能够按照收到消息的准确顺序处理队列中的消息。对于在一台机器上运行的简单服务来说,这很容易实现。但是,当我想要大规模处理时,如何保持队列消息的顺序呢?使用像 Azure Functions 这样的工具,我可能要跨数十个活动实例处理消息,如何保持顺序呢?

让我们举一个简单的例子,一个处理医院病人的消息系统。假设我为每个病人设置了几个事件:

  1. 病人到达
  2. 病人被分配了房间
  3. 患者接受治疗
  4. 患者出院

我想确保我永远不会无序地处理消息,并且有可能在我处理完患者的治疗之前就让患者出院!

让我们快速进行一些实验,看看会发生什么。为此,我将模拟 1000 名患者,每人发送这 4 条消息(按顺序),并处理它们(理想情况下也按顺序)。

服务巴士徽标

默认和无序

让我们用一个简单的 Azure Function 来尝试一下,它只会在队列上触发。我不会做任何特别的事情,只是在队列上触发,并将其正在处理的操作推送到 Redis 缓存上的列表中。



public async Task Run(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString")]Message message, 
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
    await _client.PushData((string)message.UserProperties["patientId"], Encoding.UTF8.GetString(message.Body));
}


Enter fullscreen mode Exit fullscreen mode

将 1000 位患者的数据(每人 4 条消息)发送到此队列后,Redis 缓存处理后是什么样子的?有些患者的数据看起来不错。当我查找 4 号患者时,我看到:



>lrange Patient-$4 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"


Enter fullscreen mode Exit fullscreen mode

太棒了!所有 4 个事件都已发送给患者 4,并按顺序处理。但如果我查看患者 2:



>lrange Patient-$2 0 -1
1) "Message-1"
2) "Message-2"
3) "Message-0"
4) "Message-3"


Enter fullscreen mode Exit fullscreen mode

在这种情况下,直到处理完另外两条消息后,“病人到达”消息才处理完毕。这是怎么回事?Azure 服务总线确实保证了消息的顺序,为什么我的消息顺序乱了?

默认情况下,队列触发器会执行一些操作。首先,对于每个启动的实例,它会并发处理一组消息。默认情况下,一个实例会并发处理 32 条消息。这意味着它可能同时处理同一患者的所有 4 条消息,并且这些消息的完成顺序与发送顺序不同。这个问题似乎很容易解决,我们只需将并发数限制为 1 即可。

反模式:限制扩展和并发

这或许是我见过的针对上述问题最常见的解决方案。让我们将并发数限制为每次仅处理 1 条消息,而不是 32 条。为此,我修改了host.json文件,并将 设置maxConcurrentCalls为 1。现在每个实例每次只能处理 1 条消息。我再次运行相同的测试。

首先,它超级慢。我花了很长时间来处理 4000 条队列消息,因为每个实例一次只能处理 1 条。更糟糕的是?当我之后检查结果时,有些病人的消息仍然乱序!这是怎么回事?即使我将实例并发限制为 1,Azure Functions 也已将我扩展到多个实例。因此,如果我有 20 个已扩展的函数应用程序实例,那么我将同时处理 20 条消息(每个实例 1 条)。这意味着我仍然可以同时处理来自同一个病人的消息 - 只是在不同的实例上。我仍然无法保证有序处理。

解决办法是什么?很多人想限制 Azure Functions 的扩展。虽然技术上可行,但这会进一步损害我的吞吐量。现在全局一次只能处理一条消息,这意味着在高流量期间,我会积压大量患者事件,我的函数可能无法跟上处理速度。

Sessions 来帮忙

如果这篇博文就此结束,岂不是太令人难过了?其实还有更好的办法!之前我说过,最好的选择可能是使用事件中心,因为它支持分区和批次,可以保证消息的有序性。然而,这里的挑战在于,有时考虑到队列的事务特性(例如重试和死信),它才是合适的消息代理。现在,您可以使用队列通过服务总线会话实现消息的有序性🎉。

那么什么是会话?会话允许您为一组消息设置标识符。为了处理来自会话的消息,首先必须“锁定”该会话。然后,您可以开始单独处理来自该会话的每条消息(使用与常规队列相同的 lock / complete 语义)。会话的好处是,即使在跨多个实例进行大规模处理时,它也能让您保持顺序。想想以前我们有 20 个 Azure Function App 实例都在竞争同一个队列。现在,所有 20 个实例都将“锁定”各自的可用会话,并且仅处理来自该会话的事件,而不是不扩展到 20 个。会话还能确保来自会话的消息按顺序处理。

会话可以随时动态创建。Azure Functions 实例启动后会首先询问“是否有任何消息的会话 ID 尚未锁定?”。如果是,它会锁定会话并开始按顺序处理。当会话中不再有任何可用消息时,Azure Functions 将释放锁定并转到下一个可用会话。如果不先锁定消息所属的会话,则不会处理任何消息。

对于上面的示例,我将发送相同的 4000 条消息(1000 位患者对应 4 个患者事件)。在本例中,我将患者 ID 设置为会话 ID。每个 Azure Functions 实例都会获取一个会话(患者)的锁,处理所有可用的消息,然后转到另一个有可用消息的患者。

在 Azure Functions 中使用会话

目前,会话功能在 3.1.0 及以上版本的扩展中可用Microsoft.Azure.WebJobs.Extensions.ServiceBus,撰写本文时,该功能处于预览阶段。因此,我首先会将扩展引入。



Install-Package Microsoft.Azure.WebJobs.Extensions.ServiceBus -Pre


Enter fullscreen mode Exit fullscreen mode

然后对我的功能代码进行最小的代码更改以启用会话(isSessionsEnabled = true):



public async Task Run(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message, 
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
    await _client.PushData(message.SessionId, Encoding.UTF8.GetString(message.Body));
}


Enter fullscreen mode Exit fullscreen mode

我还需要确保我正在使用支持会话的队列或主题。

会话启用队列

当我将消息推送到队列时,我会sessionId为发送的每条患者消息设置权限。

发布函数后,我推送了 4000 条消息。由于我能够跨扩展实例并发处理多个会话,队列很快就被清空了。运行测试后,我检查了 Redis 缓存。正如预期的那样,我看到所有消息都已处理,并且我看到的每个患者的消息都按顺序进行了处理:



>lrange Patient-$10 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"

>lrange Patient-$872 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"


Enter fullscreen mode Exit fullscreen mode

因此,借助 Azure Functions 新增的会话支持,我可以按顺序处理来自服务总线队列或主题的消息,而无需牺牲整体吞吐量。我可以动态地将消息添加到新的或现有的会话中,并且确信会话中的消息将按照服务总线接收的顺序进行处理。

您可以在我的 GitHub 仓库中看到我用于测试和加载消息的完整示例master分支将全部按顺序排列,out-of-order分支是默认的、无序的实验。

鏂囩珷鏉ユ簮锛�https://dev.to/azure/ordered-queue-processing-in-azure-functions-4h6c
PREV
Reconhecimento Facial com Face API 和 Node.js Reconhecimento Facial com Face API 和 Node.js
NEXT
为被宠坏的人打造的 Microsoft Azure