Asynchronous Streams in C#: IAsyncEnumerable<T> and await foreach

Async streams bring asynchronous programming to data sequences. IAsyncEnumerable<T> and await foreach enable processing data as it becomes available, rather than waiting for everything at once. This enables efficient handling of large datasets, real-time streams, and responsive UIs.

In this guide, we'll implement async iterators, use await foreach, and build streaming data pipelines. You'll learn to create async sequences that integrate seamlessly with LINQ and cancellation.

The Core Interfaces

Async streams mirror regular iterators but work with async operations. IAsyncEnumerable<T> produces async enumerators, and IAsyncEnumerator<T> handles the async iteration.

public interface IAsyncEnumerable
{
    IAsyncEnumerator GetAsyncEnumerator(
        CancellationToken cancellationToken = default);
}

public interface IAsyncEnumerator : IAsyncDisposable
{
    T Current { get; }
    ValueTask MoveNextAsync();
}

The async enumerator uses ValueTask<bool> for efficient async advancement and implements IAsyncDisposable for proper cleanup.

Creating Async Iterators

Use async and yield return to create async iterators. The compiler generates the state machine automatically.

public static async IAsyncEnumerable GenerateNumbersAsync(
    int count, [EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i < count; i++)
    {
        await Task.Delay(100, token); // Simulate async work
        yield return i;
    }
}

The EnumeratorCancellation attribute enables proper cancellation. Each iteration can perform async operations.

Usage with await foreach:

await foreach (var number in GenerateNumbersAsync(5))
{
    Console.WriteLine(number);
}

await foreach handles the async iteration automatically, awaiting each MoveNextAsync() call.

LINQ with Async Streams

Async streams work with LINQ through async versions of operators. Use System.Linq.Async for full LINQ support.

var evenSquares = GenerateNumbersAsync(10)
    .Where(n => n % 2 == 0)
    .Select(async n => {
        await Task.Delay(50);
        return n * n;
    })
    .SelectAwait(async square => {
        await Task.Delay(25);
        return await ProcessSquareAsync(square);
    });

One of the most powerful features of async streams is their integration with LINQ. You can build complex data processing pipelines that mix synchronous and asynchronous operations seamlessly.

The example starts with our GenerateNumbersAsync method, which produces numbers asynchronously. Then it uses Where to filter for even numbers - this happens synchronously since the condition is simple.

The first Select uses async n => to transform each number asynchronously. It awaits a delay (simulating some async work like calling an API) before squaring the number.

The second SelectAwait is different - it awaits the result of an async lambda. This is useful when your transformation itself is asynchronous and returns a Task.

Notice how the pipeline processes each element as it becomes available. The first number gets filtered, transformed, and processed before the second number even starts generating. This creates an efficient streaming pipeline.

The beauty is that you can compose these operations just like regular LINQ, but with async/await syntax where needed. The compiler handles all the complex state management automatically.

Cancellation and Resource Management

Async streams support cancellation through CancellationToken. Always pass tokens to async operations.

public static async IAsyncEnumerable ReadLinesAsync(
    string filePath, [EnumeratorCancellation] CancellationToken token = default)
{
    using var reader = new StreamReader(filePath);

    string line;
    while ((line = await reader.ReadLineAsync(token)) != null)
    {
        token.ThrowIfCancellationRequested();
        yield return line;
    }
}

The iterator properly disposes the StreamReader and checks for cancellation. await foreach handles disposal automatically.

using var cts = new CancellationTokenSource(5000); // 5 second timeout

try
{
    await foreach (var line in ReadLinesAsync("largefile.txt", cts.Token))
    {
        Console.WriteLine(line);
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("Reading was cancelled");
}

Infinite and Real-Time Streams

Async streams enable infinite sequences and real-time data processing.

public static async IAsyncEnumerable TimerAsync(
    TimeSpan interval, [EnumeratorCancellation] CancellationToken token = default)
{
    while (!token.IsCancellationRequested)
    {
        yield return DateTime.Now;
        await Task.Delay(interval, token);
    }
}

This creates an infinite stream of timestamps. Consumers control when to stop by cancelling or breaking the loop.

await foreach (var timestamp in TimerAsync(TimeSpan.FromSeconds(1)))
{
    Console.WriteLine($"Tick: {timestamp}");
    if (timestamp.Second >= 10) break; // Stop after 10 seconds
}

Buffering and Batching

For performance, implement buffering to reduce async overhead on fast operations.

public static async IAsyncEnumerable> BufferAsync(
    IAsyncEnumerable source, int batchSize,
    [EnumeratorCancellation] CancellationToken token = default)
{
    var batch = new List();

    await foreach (var item in source.WithCancellation(token))
    {
        batch.Add(item);

        if (batch.Count >= batchSize)
        {
            yield return batch;
            batch = new List();
        }
    }

    if (batch.Count > 0)
    {
        yield return batch;
    }
}

For high-performance scenarios, buffering can significantly reduce async overhead. When you're processing thousands of items quickly, the async machinery for each individual item becomes expensive.

The BufferAsync method collects items into batches of a specified size. It uses await foreach to process the source stream, adding each item to a batch.

When the batch reaches the desired size, it yields the complete batch and starts a new one. This reduces the number of async iterations from thousands to just a few dozen batches.

The method also yields any remaining items at the end, ensuring no data is lost even if the total count isn't evenly divisible by the batch size.

Notice the use of WithCancellation(token) - this ensures proper cancellation propagation through the pipeline. If the consumer cancels, the buffering operation stops cleanly.

This pattern is especially useful for database operations, file I/O, or network calls where you want to minimize round trips by batching operations together.

Error Handling

Handle errors in async streams using try-catch within the iterator or around await foreach.

public static async IAsyncEnumerable GenerateWithErrorsAsync(
    [EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i < 10; i++)
    {
        if (i == 5) throw new InvalidOperationException("Something went wrong");

        await Task.Delay(100, token);
        yield return i;
    }
}

// Handling in consumer
try
{
    await foreach (var number in GenerateWithErrorsAsync())
    {
        Console.WriteLine(number);
    }
}
catch (InvalidOperationException ex)
{
    Console.WriteLine($"Error: {ex.Message}");
}

Errors propagate through the async stream. Handle them at the consumption point for centralized error handling.

Converting Between Sync and Async

Convert between synchronous and asynchronous streams when needed.

// Sync to async
public static async IAsyncEnumerable ToAsyncEnumerable(
    IEnumerable source, [EnumeratorCancellation] CancellationToken token = default)
{
    foreach (var item in source)
    {
        token.ThrowIfCancellationRequested();
        yield return item;
    }
}

// Async to sync (materializes all data)
public static async Task> ToListAsync(
    IAsyncEnumerable source, CancellationToken token = default)
{
    var list = new List();
    await foreach (var item in source.WithCancellation(token))
    {
        list.Add(item);
    }
    return list;
}

Converting from async to sync requires materializing all data. Use with caution on large streams.

Performance Considerations

Async streams have overhead. Use them when async operations are truly needed.

  • Use ValueTask for efficiency: Prefer ValueTask<bool> over Task<bool> in custom enumerators.
  • Avoid unnecessary async: Don't use async streams for CPU-bound work.
  • Buffer when appropriate: Batch operations to reduce async overhead.
  • ConfigureAwait: Use ConfigureAwait(false) in library code.

Real-World Example: HTTP Streaming

Let's build an HTTP client that streams response data.

public class StreamingHttpClient
{
    private readonly HttpClient _client = new();

    public async IAsyncEnumerable StreamLinesAsync(
        string url, [EnumeratorCancellation] CancellationToken token = default)
    {
        using var response = await _client.GetAsync(url, token);
        using var stream = await response.Content.ReadAsStreamAsync(token);
        using var reader = new StreamReader(stream);

        string line;
        while ((line = await reader.ReadLineAsync(token)) != null)
        {
            yield return line;
        }
    }
}

// Usage
var client = new StreamingHttpClient();
await foreach (var line in client.StreamLinesAsync("https://api.example.com/data"))
{
    ProcessLine(line);
}

Let's build a practical example: an HTTP client that streams response data as it arrives. This is perfect for processing large API responses without loading everything into memory.

The StreamingHttpClient class wraps a standard HttpClient but returns an async stream instead of a complete response. This enables processing data as it downloads.

The StreamLinesAsync method first makes the HTTP request and gets the response stream. It then creates a StreamReader to read text lines from the stream.

The key is the while loop that reads lines asynchronously. Each await reader.ReadLineAsync(token) yields control back to the caller, allowing other work to happen while waiting for network data.

As each line becomes available, it's yielded immediately. The consumer can start processing the first line while the second line is still downloading. This creates true streaming behavior.

The using statements ensure proper cleanup of the HTTP response and stream resources. The async disposable pattern handles this automatically when the consumer finishes iteration.

In the usage example, each line gets processed immediately as it arrives. For a large API response, this could mean displaying results to the user seconds faster than loading the entire response first.

Advanced Pattern: Reactive Extensions

Async streams work well with reactive programming patterns.

public static async IAsyncEnumerable ThrottleAsync(
    IAsyncEnumerable source, TimeSpan delay,
    [EnumeratorCancellation] CancellationToken token = default)
{
    await foreach (var item in source.WithCancellation(token))
    {
        yield return item;
        await Task.Delay(delay, token);
    }
}

public static async IAsyncEnumerable DistinctUntilChangedAsync(
    IAsyncEnumerable source, [EnumeratorCancellation] CancellationToken token = default)
{
    T previous = default;
    bool hasPrevious = false;

    await foreach (var item in source.WithCancellation(token))
    {
        if (!hasPrevious || !EqualityComparer.Default.Equals(item, previous))
        {
            yield return item;
            previous = item;
            hasPrevious = true;
        }
    }
}

Async streams work beautifully with reactive programming patterns. These operators transform streams in various ways, similar to Rx.NET but with simpler async/await syntax.

The ThrottleAsync operator slows down the stream by introducing delays between items. After yielding each item, it waits for the specified delay before processing the next one. This is useful for rate limiting or creating smooth animations.

The DistinctUntilChangedAsync operator filters out consecutive duplicate values. It keeps track of the previous item and only yields new items when they differ from the last one.

This uses EqualityComparer<T>.Default for comparison, which handles null values correctly and uses the type's Equals method or reference equality as appropriate.

The hasPrevious flag ensures the first item is always yielded, even if it's the "default" value for the type. This prevents losing the initial value in the stream.

These operators can be chained together to create complex reactive pipelines. For example, you could throttle a stream of user input events and then filter out duplicates to create a smooth, debounced user experience.

The beauty of implementing these with async streams is that they integrate seamlessly with existing async/await code, unlike traditional reactive programming libraries that require learning new paradigms.

Use async streams when you need to process data asynchronously as it becomes available. They excel at I/O-bound operations and real-time data processing.

Summary

Async streams bring async programming to data sequences through IAsyncEnumerable<T> and await foreach. They enable efficient processing of large datasets, real-time streams, and responsive applications by processing data as it becomes available rather than waiting for everything.

The key is understanding when async streams provide value. Use them for I/O operations, real-time data, or when memory efficiency matters. For CPU-bound work or small datasets, traditional collections often work better. Master the pattern to build responsive, scalable async applications.