3

Evolution of An Async LINQ operator

 3 years ago
source link: http://blog.i3arnon.com/2021/07/12/async-linq-operator/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
Evolution of An Async LINQ operator
Evolution of An Async LINQ operator

LINQ (Language-Integrated Query) has been around for quite a while in the world of .NET (since .NET Framework 3.5 and C# 3.0) but recently async streams (i.e. IAsyncEnumerable<T>) were added to .NET and with them came async LINQ. It allows using the richness of LINQ while reaping the benefits of async-await by asynchronously accessing databases, microservices and remote APIs.

System.Linq.Async provides the standard LINQ operators we’ve all come to expect like Where, Select, GroupBy, etc. as extensions for IAsyncEnumerable but it’s also common for developers to add their own custom ones. Doing that has some pitfalls in “old-school” (synchronous) LINQ but even more so when you throw async-await into the mix.

For example, if we take Where, which receives an enumerable and a predicate to filter the items with, a very naive implementation may be something like (note the new await foreach):

static async Task<List<T>> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    // Create a list to hold the filtered items
    var filteredItems = new List<T>();
    await foreach (var item in source)
    {
        if (predicate(item))
        {
            // Add the filtered item to the list
            filteredItems.Add(item);
        }
    }

    // Return the list containing the filtered items
    return filteredItems;
}

However, that implementation breaks one of the foundational properties of LINQ: Deferred Execution.

Deferred Execution

Deferring the execution means the operation isn’t executed at the point where the method is invoked, it only executes when the enumerable gets enumerated (e.g. with await foreach). The easiest way to fix that would be to utilize C# 8.0’s async iterators:

static async IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    await foreach (var item in source)
    {
        if (predicate(item))
        {
            // Return the filtered item and pause until the next MoveNextAsync call
            yield return item;
        }
    }
}

An async iterator method is a method that:

  1. Returns an IAsyncEnumerable<T>.
  2. Is declared with the async modifier.
  3. Contains yield return (or yield break) statements.

The compiler, behind the scenes, turns this kind of methods into state machines. They allow to start returning items only when the consumer asks for them, which is done by enumerating the returned async enumerable.

So now when Where is called no execution has started yet, it will only start when the await foreach is reached:

IAsyncEnumerable<Hamster> hamsters = // ..
IAsyncEnumerable<Hamster> filteredHamsters = hamsters.Where(hamster => hamster.Name == "Bar");
// No execution has happened yet
await foreach (Hamster hamster in filteredHamsters)
{
    Console.WriteLine(hamster.Age);
}

Using an async iterator also achieves another foundational property of LINQ: Streaming Execution.

Streaming Execution

When the execution is streaming the items are returned one at a time as the consumer asks for them. The iterator doesn’t need to consume the entire source enumerable in order to return the filtered items, it can return them one by one (in fact, the source enumerable can even be infinite).

Now, while we want the main execution of our method to be deferred there is a part we want to run eagerly right when the method is called: argument validation. If something is wrong with one of the arguments passed to our operator method we don’t want to wait until the result is enumerated in order to notify the consumer about it. We want to “fail fast” and throw an exception quickly. Since an async iterator only starts running when the result is enumerated we’ll have to break our method into two.

In order to do that we can utilize another relatively new C# feature: local methods. That way the local method can be an async iterator while the outer method can eagerly validate the arguments:

static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    // Argument validation
    if (source is null) throw new ArgumentNullException(nameof(source));
    if (predicate is null) throw new ArgumentNullException(nameof(predicate));

    return Core();

    // Local async iterator method
    async IAsyncEnumerable<T> Core()
    {
        await foreach (var item in source)
        {
            if (predicate(item))
            {
                yield return item;
            }
        }
    }
}

Everything up to here is relevant for both async and synchronous LINQ, but implementing an async LINQ operator has some extra things you need to get right, starting with cancellation.

WithCancellation

Since the underlying operations being performed when enumerating async streams are potentially remote asynchronous operations that may take a while to complete it’s beneficial to be able to cancel them when needed (imagine a long DB query for a web request that already timed out). In order to support cancellation the IAsyncEnumerable.GetEnumerator method accepts a CancellationToken, but since people don’t usually call GetEnumerator by themselves (they call it implicitly via await foreach) IAsyncEnumerable has an extension method to “bake in” the CancellationToken into the enumerable:

CancellationToken cancellationToken = //..
IAsyncEnumerable<Hamster> filteredHamsters = // ..
await foreach (var hamster in filteredHamsters.WithCancellation(cancellationToken))
{
    Console.WriteLine(hamster.Age);
}

Cancellation in .NET is cooperative, which means that the consumer may pass a CancellationToken but the code being executed needs to look at that token to observe the cancellation and act upon it. But how can our Where method, which was already invoked and returned an enumerable, access this token being passed afterwards while enumerating the same enumerable? With the C# compiler’s help.

Async iterator methods can accept a CancellationToken annotated with the EnumeratorCancellation attribute. That tells the compiler to create a CancellationToken that gets cancelled whether the consumer passed a token using WithCancellation in the await foreach or whether they passed a token directly when invoking the async iterator method.

All we need it to do is add that CancellationToken parameter to our async iterator method and use it when enumerating the source enumerable:

static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    if (source is null) throw new ArgumentNullException(nameof(source));
    if (predicate is null) throw new ArgumentNullException(nameof(predicate));

    // Using the CancellationToken parameter's default value
    return Core();

    async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in source.WithCancellation(cancellationToken))
        {
            if (predicate(item))
            {
                yield return item;
            }
        }
    }
}

We can only add that token to the local Core method since the outer method isn’t an async iterator (no async keyword). However, we wouldn’t want to do that even if we could. Imagine calling multiple LINQ operators one after another and passing the same token again and again (e.g. Where(.., token).Select(.., token).OrderBy(.., token)). We want to allow the consumer to pass the token once when enumerating, and it’s on us to make sure the token “moves through the pipe” of all the operators and reach the actual asynchronous operation we want to cancel (e.g. the DB query).

ConfigureAwait(false)

Another uniquely async-await thing it’s important to get right is when to call ConfigureAwait(false). When awaiting inside async methods, unless there’s a good reason not to, you usually want to ignore the SynchronizationContext if there is one. The SynchronizationContext allows to return back to some context (usually the UI thread) after the asynchronous operation being awaited resumes. Using it in places where it’s not needed slightly hurts performance and may lead to deadlocks (more on it here).

So you can use ConfigureAwait(false) when enumerating an IAsyncEnumerable just like you can when awaiting a task:

static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
{
    if (source is null) throw new ArgumentNullException(nameof(source));
    if (predicate is null) throw new ArgumentNullException(nameof(predicate));

    return Core();

    async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            if (predicate(item))
            {
                yield return item;
            }
        }
    }
}

It’s interesting to note that await foreach is pattern-based just like foreach, await and other capabilities in C#. Both the WithCancellation and ConfigureAwait methods return the ConfiguredCancelableAsyncEnumerable struct which doesn’t implement the IAsyncEnumerable interface. But it does have a GetAsyncEnumerator method that returns an enumerator that has MoveNextAsync, Current & DisposeAsync. If follows the required pattern and therefore can be used in await foreach (I’ve explained more about these patterns in C# in an earlier post).

Where, WhereAwait & WhereAwaitWithCancellation

Our Where implementation is now complete, but that method still has some limitations. It allows to filter an async enumerable using a predicate, but that predicate delegate is synchronous (it returns a bool, not a Task<bool>). What if for example, you can only filter the items by calling another microservice? Async LINQ covers these cases as well.

The convention in async LINQ is to have three kinds of “overloads” for each operator:

  1. An implementation that accepts a synchronous delegate (e.g. our Where method).
  2. An implementation that accepts an async delegate (a delegate with an async return type).
    • That method should have an Await suffix to signify that the delegates are being awaited (WhereAwait in our case).
  3. An implementation that accepts a cancellable async delegate (an async delegate that accepts a CancellationToken on top of its other parameters).
    • That method should have an AwaitWithCancellation suffix (WhereAwaitWithCancellation).

Naming the versions differently helps the consumer to distinguish between the versions, but it also makes overload resolution easier for the compiler and helps it to target type the lambda expressions passed as arguments.

WhereAwait

The signature for WhereAwait is similar to Where, only instead of accepting Func<T, bool> as the predicate it’s now Func<T, ValueTask<bool>>. ValueTask<T> is an awaitable like Task<T> that allows to avoid Task<T> allocations in cases where the operations ran synchronously (I’ve posted about it when it was introduced). Using ValueTask<T> instead of Task<T> helps the consumer to generally avoid some unnecessary allocations which reduces GC pressure.

The implementation is also similar to Where. We just need to await the ValueTask<bool> returned from invoking the predicate, and to ignore the SynchronizationContext with ConfigureAwait(false):

static IAsyncEnumerable<T> WhereAwait<T>(this IAsyncEnumerable<T> source, Func<T, ValueTask<bool>> predicate)
{
    if (source is null) throw new ArgumentNullException(nameof(source));
    if (predicate is null) throw new ArgumentNullException(nameof(predicate));

    return Core();

    async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            // Await the ValueTask<bool> returned from the predicate
            if (await predicate(item).ConfigureAwait(false))
            {
                yield return item;
            }
        }
    }
}
IAsyncEnumerable<Hamster> hamsters = // ..
IAsyncEnumerable<Hamster> filteredHamsters = hamsters.WhereAwait(async hamster => await FindHamsterTypeAsync(hamster) == HamsterType.Dwarf);

WhereAwaitWithCancellation

WhereAwaitWithCancellation allows the consumer to not only perform an asynchronous operation inside the predicate, but also to cancel that operation when the enumeration is cancelled. All we need for that to work is to accept the correct delegate and pass our CancellationToken when invoking it:

static IAsyncEnumerable<T> WhereAwaitWithCancellation<T>(this IAsyncEnumerable<T> source, Func<T, CancellationToken, ValueTask<bool>> predicate)
{
    if (source is null) throw new ArgumentNullException(nameof(source));
    if (predicate is null) throw new ArgumentNullException(nameof(predicate));

    return Core();

    async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            // Pass the CancellationToken to the predicate itself, on top of the enumerable
            if (await predicate(item, cancellationToken).ConfigureAwait(false))
            {
                yield return item;
            }
        }
    }
}
IAsyncEnumerable<Hamster> hamsters = // ..
IAsyncEnumerable<Hamster> filteredHamsters = hamsters.WhereAwaitWithCancellation(async (hamster, token) => await FindHamsterTypeAsync(hamster, token) == HamsterType.Dwarf);

Summary

It has been common, in my experience, for many projects to add their own custom extensions to LINQ and I expect the same would gradually happen with async LINQ as its adoption grows. With async LINQ (just as with async-await as a whole) it’s a bit easier to shoot yourself in the foot. To avoid that you need to make sure the execution is deferred and streaming, cancellation is observed, ConfigureAwait(false) is used appropriately and the right “overloads” are implemented.


Note: This isn’t how most operators are actually implemented in async LINQ. In order to utilize some optimizations the async iterators are custom implemented, instead of relying on the compiler generated ones. You can see the Where versions’ implementations here.


Recommend

  • 51

    JINQ Java IN tegrated Q uery in parlance with LINQ is an ultra minimalistic library for Java inspired from and mimicking the .NET

  • 32
    • 微信 mp.weixin.qq.com 4 years ago
    • Cache

    C# 之 LINQ 简析

    C# 之 LINQ简析 什么是LINQ? Linq是在.NET Framework 3.5 之后的版本出现的. 在程序中的数据和数据库的数据相反,保存在类对象或结构 中的数据差异很大。没有通用的查询...

  • 25
    • 微信 mp.weixin.qq.com 3 years ago
    • Cache

    C#.NET 强大的LINQ

    阅读本文大概需要 13 分钟。 大家好,这是 [C#.NET 拾遗补漏] 系列的第 08 篇文章,今天讲 C# 强大的 LINQ 查询。LINQ 是我最喜欢的 C# 语言特性之一。 LINQ 是 L anguage  IN tegrated 

  • 3
    • justinmeiners.github.io 3 years ago
    • Cache

    Understanding LINQ GroupBy

    Understanding LINQ GroupByUnderstanding LINQ GroupBy 09/26/20 C# programmers are typically familiar with Select, Where, and Aggregate, the LINQ equivalents of the core...

  • 10
    • twistedoakstudios.com 3 years ago
    • Cache

    Linq to Collections: Beyond IEnumerable<T>

    Linq to Collections: Beyond IEnumerable<T> posted by Craig Gidney on December 18, 2012 The latest version of the .Net...

  • 11
    • blog.ielliott.io 3 years ago
    • Cache

    Why LINQ (well, C#) is Broken

    Why LINQ (well, C#) is Broken 24 October 2016 LINQ is a system that provides a flexible query interface for .NET languages. It allows a user to write queries over arbitrary data using an in-built SQL-like syntax. Thi...

  • 11
    • gunnarpeipman.com 3 years ago
    • Cache

    Translating NHibernate LINQ query to SQL

    Gunnar Peipman – Programming BlogASP.NET Core, Blazor, .NET, Azure, SharePoint, IoT A portal focused on Operatio...

  • 5

    Some Insights from the Evolution of Operator 5G Viewpoints As 5G planning progresses, we’re getting more information from operators about how they’re seeing things like Open RAN, public-cloud 5G hosting, and new 5G-dependent services...

  • 15
    • www.codesd.com 3 years ago
    • Cache

    Operator linq as * (asterisk) in sql

    Operator linq as * (asterisk) in sql advertisements I want to select all the fields of one table, and only specifics fields of a second table....

  • 5
    • blog.darkthread.net 1 year ago
    • Cache

    LINQ ForEach() 與 async / await

    LINQ ForEach() 與 async 犯了 async/await 低級錯誤,鬼打牆近半小時,PO 文留念。 .NET 4.5/C# 5.0 開始引進 Asynchronous Function 概念及 async/await 保留字,非同步化函式漸漸成為 .NET 的主流寫法,以取代 WebClient/HttpWebRequest...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK