Bất đồng bộ
IAsyncEnumerable
Giới thiệu
IAsyncEnumerable<T> cho phép stream dữ liệu bất đồng bộ, tốt cho việc xử lý large datasets.
Sử dụng
public async IAsyncEnumerable<Product> GetProductsStreamAsync()
{
using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
using var command = new SqlCommand("SELECT * FROM Products", connection);
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
yield return new Product
{
Id = reader.GetInt32(0),
Name = reader.GetString(1),
Price = reader.GetDecimal(2)
};
}
}
// Sử dụng
await foreach (var product in GetProductsStreamAsync())
{
Console.WriteLine(product.Name);
}
So sánh với IEnumerable
// IEnumerable - Block thread, load all to memory
public IEnumerable<Product> GetProductsSync()
{
var products = _context.Products.ToList(); // Load all
foreach (var p in products)
yield return p;
}
// IAsyncEnumerable - Non-blocking, stream data
public async IAsyncEnumerable<Product> GetProductsAsync()
{
await foreach (var p in _context.Products.AsAsyncEnumerable())
yield return p;
}
Kỹ thuật Stream dữ liệu lớn
Chunking
public async Task ProcessLargeDatasetAsync()
{
const int batchSize = 1000;
var skip = 0;
while (true)
{
var batch = await _context.Products
.AsNoTracking()
.OrderBy(p => p.Id)
.Skip(skip)
.Take(batchSize)
.ToListAsync();
if (batch.Count == 0)
break;
// Process batch
await ProcessBatchAsync(batch);
skip += batchSize;
Console.WriteLine($"Processed {skip} items");
}
}
Parallel Processing
public async Task ProcessInParallelAsync()
{
var products = await _context.Products
.AsNoTracking()
.Where(p => !p.Processed)
.ToListAsync();
var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
await Parallel.ForEachAsync(products, options, async (product, ct) =>
{
await ProcessProductAsync(product);
});
}
Cancellation Token
public async Task<List<Product>> GetProductsAsync(
CancellationToken cancellationToken = default)
{
var result = new List<Product>();
await foreach (var product in GetProductsStreamAsync())
{
cancellationToken.ThrowIfCancellationRequested();
result.Add(product);
if (result.Count >= 1000)
break;
}
return result;
}
// Sử dụng với timeout
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
var products = await GetProductsAsync(cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation timed out");
}
Channel (Producer-Consumer)
// Producer
public class DataProducer
{
private readonly Channel<int> _channel;
public DataProducer()
{
_channel = Channel.CreateBounded<int>(100);
}
public async Task ProduceAsync(CancellationToken ct)
{
for (var i = 0; i < 10000; i++)
{
await _channel.Writer.WriteAsync(i, ct);
}
_channel.Writer.Complete();
}
public ChannelReader<int> Reader => _channel.Reader;
}
// Consumer
public class DataConsumer
{
private readonly ChannelReader<int> _reader;
public DataConsumer(ChannelReader<int> reader)
{
_reader = reader;
}
public async Task ConsumeAsync(CancellationToken ct)
{
await foreach (var item in _reader.ReadAllAsync(ct))
{
await ProcessAsync(item);
}
}
}
ValueTask vs Task
// Task - Always allocate
public async Task<int> GetValueAsync()
{
await Task.Delay(1);
return 42;
}
// ValueTask - Avoid allocation for synchronous completion
public async ValueTask<int> GetValueAsync()
{
if (_cache.TryGetValue(out int value))
return value; // Synchronous - no allocation
return new ValueTask<int>(42); // Async path
}
| Aspect | Task | ValueTask |
|---|---|---|
| Allocation | Always heap | Avoided if sync |
| Use case | Standard async | Hot path |
| Synchronous return | Not allowed | Allowed |