Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Bất đồng bộ

IAsyncEnumerable

Giới thiệu

IAsyncEnumerable<T> cho phép stream dữ liệu bất đồng bộ, tốt cho việc xử lý large datasets.

Sử dụng

public async IAsyncEnumerable<Product> GetProductsStreamAsync()
{
    using var connection = new SqlConnection(connectionString);
    await connection.OpenAsync();
    
    using var command = new SqlCommand("SELECT * FROM Products", connection);
    using var reader = await command.ExecuteReaderAsync();
    
    while (await reader.ReadAsync())
    {
        yield return new Product
        {
            Id = reader.GetInt32(0),
            Name = reader.GetString(1),
            Price = reader.GetDecimal(2)
        };
    }
}

// Sử dụng
await foreach (var product in GetProductsStreamAsync())
{
    Console.WriteLine(product.Name);
}

So sánh với IEnumerable

// IEnumerable - Block thread, load all to memory
public IEnumerable<Product> GetProductsSync()
{
    var products = _context.Products.ToList(); // Load all
    foreach (var p in products)
        yield return p;
}

// IAsyncEnumerable - Non-blocking, stream data
public async IAsyncEnumerable<Product> GetProductsAsync()
{
    await foreach (var p in _context.Products.AsAsyncEnumerable())
        yield return p;
}

Kỹ thuật Stream dữ liệu lớn

Chunking

public async Task ProcessLargeDatasetAsync()
{
    const int batchSize = 1000;
    var skip = 0;
    
    while (true)
    {
        var batch = await _context.Products
            .AsNoTracking()
            .OrderBy(p => p.Id)
            .Skip(skip)
            .Take(batchSize)
            .ToListAsync();
        
        if (batch.Count == 0)
            break;
        
        // Process batch
        await ProcessBatchAsync(batch);
        
        skip += batchSize;
        Console.WriteLine($"Processed {skip} items");
    }
}

Parallel Processing

public async Task ProcessInParallelAsync()
{
    var products = await _context.Products
        .AsNoTracking()
        .Where(p => !p.Processed)
        .ToListAsync();
    
    var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
    
    await Parallel.ForEachAsync(products, options, async (product, ct) =>
    {
        await ProcessProductAsync(product);
    });
}

Cancellation Token

public async Task<List<Product>> GetProductsAsync(
    CancellationToken cancellationToken = default)
{
    var result = new List<Product>();
    
    await foreach (var product in GetProductsStreamAsync())
    {
        cancellationToken.ThrowIfCancellationRequested();
        
        result.Add(product);
        
        if (result.Count >= 1000)
            break;
    }
    
    return result;
}

// Sử dụng với timeout
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

try
{
    var products = await GetProductsAsync(cts.Token);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Operation timed out");
}

Channel (Producer-Consumer)

// Producer
public class DataProducer
{
    private readonly Channel<int> _channel;
    
    public DataProducer()
    {
        _channel = Channel.CreateBounded<int>(100);
    }
    
    public async Task ProduceAsync(CancellationToken ct)
    {
        for (var i = 0; i < 10000; i++)
        {
            await _channel.Writer.WriteAsync(i, ct);
        }
        _channel.Writer.Complete();
    }
    
    public ChannelReader<int> Reader => _channel.Reader;
}

// Consumer
public class DataConsumer
{
    private readonly ChannelReader<int> _reader;
    
    public DataConsumer(ChannelReader<int> reader)
    {
        _reader = reader;
    }
    
    public async Task ConsumeAsync(CancellationToken ct)
    {
        await foreach (var item in _reader.ReadAllAsync(ct))
        {
            await ProcessAsync(item);
        }
    }
}

ValueTask vs Task

// Task - Always allocate
public async Task<int> GetValueAsync()
{
    await Task.Delay(1);
    return 42;
}

// ValueTask - Avoid allocation for synchronous completion
public async ValueTask<int> GetValueAsync()
{
    if (_cache.TryGetValue(out int value))
        return value; // Synchronous - no allocation
    
    return new ValueTask<int>(42); // Async path
}
AspectTaskValueTask
AllocationAlways heapAvoided if sync
Use caseStandard asyncHot path
Synchronous returnNot allowedAllowed