Async streams bring asynchronous programming to data sequences. IAsyncEnumerable<T>
and await foreach
enable processing data as it becomes available, rather than waiting for everything at once. This enables
efficient handling of large datasets, real-time streams, and responsive UIs.
In this guide, we'll implement async iterators, use await foreach
, and
build streaming data pipelines. You'll learn to create async sequences that integrate seamlessly with LINQ
and cancellation.
The Core Interfaces
Async streams mirror regular iterators but work with async operations. IAsyncEnumerable<T>
produces async enumerators, and IAsyncEnumerator<T>
handles the async iteration.
public interface IAsyncEnumerable
{
IAsyncEnumerator GetAsyncEnumerator(
CancellationToken cancellationToken = default);
}
public interface IAsyncEnumerator : IAsyncDisposable
{
T Current { get; }
ValueTask MoveNextAsync();
}
The async enumerator uses ValueTask<bool>
for efficient async
advancement and implements IAsyncDisposable
for proper cleanup.
Creating Async Iterators
Use async
and yield return
to create async
iterators. The compiler generates the state machine automatically.
public static async IAsyncEnumerable GenerateNumbersAsync(
int count, [EnumeratorCancellation] CancellationToken token = default)
{
for (int i = 0; i < count; i++)
{
await Task.Delay(100, token); // Simulate async work
yield return i;
}
}
The EnumeratorCancellation
attribute enables proper cancellation. Each
iteration can perform async operations.
Usage with await foreach
:
await foreach (var number in GenerateNumbersAsync(5))
{
Console.WriteLine(number);
}
await foreach
handles the async iteration automatically, awaiting each
MoveNextAsync()
call.
LINQ with Async Streams
Async streams work with LINQ through async versions of operators. Use System.Linq.Async
for full LINQ support.
var evenSquares = GenerateNumbersAsync(10)
.Where(n => n % 2 == 0)
.Select(async n => {
await Task.Delay(50);
return n * n;
})
.SelectAwait(async square => {
await Task.Delay(25);
return await ProcessSquareAsync(square);
});
One of the most powerful features of async streams is their integration with LINQ. You can build complex data processing pipelines that mix synchronous and asynchronous operations seamlessly.
The example starts with our GenerateNumbersAsync
method, which produces
numbers asynchronously. Then it uses Where
to filter for even numbers -
this happens synchronously since the condition is simple.
The first Select
uses async n =>
to
transform each number asynchronously. It awaits a delay (simulating some async work like calling an API)
before squaring the number.
The second SelectAwait
is different - it awaits the result of an async
lambda. This is useful when your transformation itself is asynchronous and returns a Task.
Notice how the pipeline processes each element as it becomes available. The first number gets filtered, transformed, and processed before the second number even starts generating. This creates an efficient streaming pipeline.
The beauty is that you can compose these operations just like regular LINQ, but with async/await syntax where needed. The compiler handles all the complex state management automatically.
Cancellation and Resource Management
Async streams support cancellation through CancellationToken
. Always pass
tokens to async operations.
public static async IAsyncEnumerable ReadLinesAsync(
string filePath, [EnumeratorCancellation] CancellationToken token = default)
{
using var reader = new StreamReader(filePath);
string line;
while ((line = await reader.ReadLineAsync(token)) != null)
{
token.ThrowIfCancellationRequested();
yield return line;
}
}
The iterator properly disposes the StreamReader
and checks for
cancellation. await foreach
handles disposal automatically.
using var cts = new CancellationTokenSource(5000); // 5 second timeout
try
{
await foreach (var line in ReadLinesAsync("largefile.txt", cts.Token))
{
Console.WriteLine(line);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Reading was cancelled");
}
Infinite and Real-Time Streams
Async streams enable infinite sequences and real-time data processing.
public static async IAsyncEnumerable TimerAsync(
TimeSpan interval, [EnumeratorCancellation] CancellationToken token = default)
{
while (!token.IsCancellationRequested)
{
yield return DateTime.Now;
await Task.Delay(interval, token);
}
}
This creates an infinite stream of timestamps. Consumers control when to stop by cancelling or breaking the loop.
await foreach (var timestamp in TimerAsync(TimeSpan.FromSeconds(1)))
{
Console.WriteLine($"Tick: {timestamp}");
if (timestamp.Second >= 10) break; // Stop after 10 seconds
}
Buffering and Batching
For performance, implement buffering to reduce async overhead on fast operations.
public static async IAsyncEnumerable> BufferAsync(
IAsyncEnumerable source, int batchSize,
[EnumeratorCancellation] CancellationToken token = default)
{
var batch = new List();
await foreach (var item in source.WithCancellation(token))
{
batch.Add(item);
if (batch.Count >= batchSize)
{
yield return batch;
batch = new List();
}
}
if (batch.Count > 0)
{
yield return batch;
}
}
For high-performance scenarios, buffering can significantly reduce async overhead. When you're processing thousands of items quickly, the async machinery for each individual item becomes expensive.
The BufferAsync
method collects items into batches of a specified size. It
uses await foreach
to process the source stream, adding each item to a
batch.
When the batch reaches the desired size, it yields the complete batch and starts a new one. This reduces the number of async iterations from thousands to just a few dozen batches.
The method also yields any remaining items at the end, ensuring no data is lost even if the total count isn't evenly divisible by the batch size.
Notice the use of WithCancellation(token)
- this ensures proper
cancellation propagation through the pipeline. If the consumer cancels, the buffering operation stops
cleanly.
This pattern is especially useful for database operations, file I/O, or network calls where you want to minimize round trips by batching operations together.
Error Handling
Handle errors in async streams using try-catch within the iterator or around await foreach
.
public static async IAsyncEnumerable GenerateWithErrorsAsync(
[EnumeratorCancellation] CancellationToken token = default)
{
for (int i = 0; i < 10; i++)
{
if (i == 5) throw new InvalidOperationException("Something went wrong");
await Task.Delay(100, token);
yield return i;
}
}
// Handling in consumer
try
{
await foreach (var number in GenerateWithErrorsAsync())
{
Console.WriteLine(number);
}
}
catch (InvalidOperationException ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
Errors propagate through the async stream. Handle them at the consumption point for centralized error handling.
Converting Between Sync and Async
Convert between synchronous and asynchronous streams when needed.
// Sync to async
public static async IAsyncEnumerable ToAsyncEnumerable(
IEnumerable source, [EnumeratorCancellation] CancellationToken token = default)
{
foreach (var item in source)
{
token.ThrowIfCancellationRequested();
yield return item;
}
}
// Async to sync (materializes all data)
public static async Task> ToListAsync(
IAsyncEnumerable source, CancellationToken token = default)
{
var list = new List();
await foreach (var item in source.WithCancellation(token))
{
list.Add(item);
}
return list;
}
Converting from async to sync requires materializing all data. Use with caution on large streams.
Performance Considerations
Async streams have overhead. Use them when async operations are truly needed.
- Use ValueTask for efficiency: Prefer
ValueTask<bool>
overTask<bool>
in custom enumerators. - Avoid unnecessary async: Don't use async streams for CPU-bound work.
- Buffer when appropriate: Batch operations to reduce async overhead.
- ConfigureAwait: Use
ConfigureAwait(false)
in library code.
Real-World Example: HTTP Streaming
Let's build an HTTP client that streams response data.
public class StreamingHttpClient
{
private readonly HttpClient _client = new();
public async IAsyncEnumerable StreamLinesAsync(
string url, [EnumeratorCancellation] CancellationToken token = default)
{
using var response = await _client.GetAsync(url, token);
using var stream = await response.Content.ReadAsStreamAsync(token);
using var reader = new StreamReader(stream);
string line;
while ((line = await reader.ReadLineAsync(token)) != null)
{
yield return line;
}
}
}
// Usage
var client = new StreamingHttpClient();
await foreach (var line in client.StreamLinesAsync("https://api.example.com/data"))
{
ProcessLine(line);
}
Let's build a practical example: an HTTP client that streams response data as it arrives. This is perfect for processing large API responses without loading everything into memory.
The StreamingHttpClient
class wraps a standard HttpClient
but returns an async stream instead of a complete response. This
enables processing data as it downloads.
The StreamLinesAsync
method first makes the HTTP request and gets the
response stream. It then creates a StreamReader
to read text lines from the
stream.
The key is the while
loop that reads lines asynchronously. Each await reader.ReadLineAsync(token)
yields control back to the caller, allowing
other work to happen while waiting for network data.
As each line becomes available, it's yielded immediately. The consumer can start processing the first line while the second line is still downloading. This creates true streaming behavior.
The using
statements ensure proper cleanup of the HTTP response and stream
resources. The async disposable pattern handles this automatically when the consumer finishes iteration.
In the usage example, each line gets processed immediately as it arrives. For a large API response, this could mean displaying results to the user seconds faster than loading the entire response first.
Advanced Pattern: Reactive Extensions
Async streams work well with reactive programming patterns.
public static async IAsyncEnumerable ThrottleAsync(
IAsyncEnumerable source, TimeSpan delay,
[EnumeratorCancellation] CancellationToken token = default)
{
await foreach (var item in source.WithCancellation(token))
{
yield return item;
await Task.Delay(delay, token);
}
}
public static async IAsyncEnumerable DistinctUntilChangedAsync(
IAsyncEnumerable source, [EnumeratorCancellation] CancellationToken token = default)
{
T previous = default;
bool hasPrevious = false;
await foreach (var item in source.WithCancellation(token))
{
if (!hasPrevious || !EqualityComparer.Default.Equals(item, previous))
{
yield return item;
previous = item;
hasPrevious = true;
}
}
}
Async streams work beautifully with reactive programming patterns. These operators transform streams in various ways, similar to Rx.NET but with simpler async/await syntax.
The ThrottleAsync
operator slows down the stream by introducing delays
between items. After yielding each item, it waits for the specified delay before processing the next one.
This is useful for rate limiting or creating smooth animations.
The DistinctUntilChangedAsync
operator filters out consecutive duplicate
values. It keeps track of the previous item and only yields new items when they differ from the last one.
This uses EqualityComparer<T>.Default
for comparison, which handles
null values correctly and uses the type's Equals
method or reference
equality as appropriate.
The hasPrevious
flag ensures the first item is always yielded, even if it's
the "default" value for the type. This prevents losing the initial value in the stream.
These operators can be chained together to create complex reactive pipelines. For example, you could throttle a stream of user input events and then filter out duplicates to create a smooth, debounced user experience.
The beauty of implementing these with async streams is that they integrate seamlessly with existing async/await code, unlike traditional reactive programming libraries that require learning new paradigms.
Use async streams when you need to process data asynchronously as it becomes available. They excel at I/O-bound operations and real-time data processing.
Summary
Async streams bring async programming to data sequences through IAsyncEnumerable<T>
and await foreach
.
They enable efficient processing of large datasets, real-time streams, and responsive applications by
processing data as it becomes available rather than waiting for everything.
The key is understanding when async streams provide value. Use them for I/O operations, real-time data, or when memory efficiency matters. For CPU-bound work or small datasets, traditional collections often work better. Master the pattern to build responsive, scalable async applications.