.NET Core 3.0 中的 IAsyncEnumerable<T> 有什么大不了的?
.NET Core 3.0 和 C# 8.0 最令人兴奋的功能之一就是新增了IAsyncEnumerable<T>
异步流 (async streams)。但它到底有什么特别之处呢?我们现在能做哪些以前做不到的事情呢?
在本文中,我们将研究它IAsyncEnumerable<T>
旨在解决哪些挑战,如何在我们自己的应用程序中实现它,以及为什么在许多情况下IAsyncEnumerable<T>
会取代它Task<IEnumerable<T>>
。
之前的生活IAsyncEnumerable<T>
也许说明它为何有用的最好方法IAsyncEnumerable<T>
是看看如果没有它会面临什么挑战。
假设我们正在构建一个数据访问库,并且需要一个方法来查询数据存储或 API 中的数据。该方法通常会返回Task<IEnumerable<T>>
,如下所示:
public async Task<IEnumerable<Product>> GetAllProducts()
为了实现该方法,我们通常会异步执行一些数据访问,然后在完成后返回所有数据。当我们需要进行多次异步调用来获取数据时,这个问题就变得更加明显。例如,我们的数据库或 API 可能会以页面形式返回数据,例如以下使用 Azure Cosmos DB 的实现:
public async Task<IEnumerable<Product>> GetAllProducts()
{
Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
var products = new List<Product>();
while (iterator.HasMoreResults)
{
foreach (var product in await iterator.ReadNextAsync())
{
products.Add(product);
}
}
return products;
}
注意,我们在 while 循环中对所有结果进行分页,实例化所有产品对象,将它们放入 中List<Product>
,最后返回所有结果。这效率很低,尤其是对于较大的数据集。
也许我们可以通过改变方法一次返回一页结果来创建更有效的实现:
public IEnumerable<Task<IEnumerable<Product>>> GetAllProducts()
{
Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
yield return iterator.ReadNextAsync().ContinueWith(t =>
{
return (IEnumerable<Product>)t.Result;
});
}
}
调用者将像这样使用该方法:
foreach (var productsTask in productsRepository.GetAllProducts())
{
foreach (var product in await productsTask)
{
Console.WriteLine(product.Name);
}
}
此实现效率更高,但该方法现在返回IEnumerable<Task<IEnumerable<Product>>>
。正如我们在调用代码中看到的,理解如何调用该方法并处理数据并不直观。更重要的是,分页是数据访问方法的实现细节,调用者对此一无所知。
IAsyncEnumerable<T>
救援
我们真正想要做的是从数据库异步检索数据,并在结果可用时将其流式传输回调用者。
在同步代码中,返回的方法IEnumerable<T>
可以使用yield return
语句将每条数据从数据库返回时返回给调用者。
public IEnumerable<Product> GetAllProducts()
{
Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
foreach (var product in iterator.ReadNextAsync().Result)
{
yield return product;
}
}
}
但是,千万不要这么做!上面的代码会把异步数据库调用变成阻塞调用,而且无法扩展。
要是能用异步yield return
方法就好了!这在以前是不可能实现的……直到现在。
IAsyncEnumerable<T>
在 .NET Core 3(.NET Standard 2.1)中引入。它公开了一个枚举器,其中包含一个MoveNextAsync()
可等待的方法。这意味着生产者可以在生成结果之间进行异步调用。
Task<IEnumerable<T>>
我们的方法现在可以返回IAsyncEnumerable<T>
并用于yield return
发出数据,而不是返回。
public async IAsyncEnumerable<Product> GetAllProducts()
{
Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
foreach (var product in await iterator.ReadNextAsync())
{
yield return product;
}
}
}
要使用结果,我们需要使用await foreach()
C# 8 中提供的新语法:
await foreach (var product in productsRepository.GetAllProducts())
{
Console.WriteLine(product);
}
这样就好多了。该方法会在数据可用时生成数据。调用代码则按照自己的节奏使用数据。
IAsyncEnumerable<T>
和 ASP.NET Core
从.NET Core 3 Preview 7开始,ASP.NET 能够IAsyncEnumerable<T>
从 API 控制器操作返回结果。这意味着我们可以直接返回方法的结果——有效地将数据从数据库流式传输到 HTTP 响应。
[HttpGet]
public IAsyncEnumerable<Product> Get()
=> productsRepository.GetAllProducts();
替换Task<IEnumerable<T>>
为IAsyncEnumerable<T>
随着时间的推移和 .NET Core 3 和 .NET Standard 2.1 的采用率不断增长,我们有望IAsyncEnumerable<T>
在通常使用的地方看到它们的应用Task<IEnumerable<T>>
。
我期待看到库支持IAsyncEnumerable<T>
。在本文中,我们看到了使用 Azure Cosmos DB 3.0 SDK 查询数据的代码如下:
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
foreach (var product in await iterator.ReadNextAsync())
{
Console.WriteLine(product.Name);
}
}
与我们之前的示例一样,Cosmos DB 自己的 SDK 也泄露了其分页实现细节,这使得处理查询结果变得很尴尬。
GetItemQueryIterator<Product>()
为了查看返回后的样子IAsyncEnumerable<T>
,我们可以创建一个扩展方法FeedIterator
:
public static class FeedIteratorExtensions
{
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(this FeedIterator<T> iterator)
{
while (iterator.HasMoreResults)
{
foreach(var item in await iterator.ReadNextAsync())
{
yield return item;
}
}
}
}
现在我们可以用更干净的方式处理查询结果:
var products = container
.GetItemQueryIterator<Product>("SELECT * FROM c")
.ToAsyncEnumerable();
await foreach (var product in products)
{
Console.WriteLine(product.Name);
}
概括
IAsyncEnumerable<T>
是 .NET 的一个受欢迎的新增功能,在许多情况下可以使代码更加简洁、高效。您可以通过以下资源了解更多信息: