Multiple Queues Management
Overview
Khi ứng dụng cần làm việc với nhiều queue khác nhau (ví dụ: orders-queue, notifications-queue, inventory-queue), có một số approaches để quản lý. Phần này trình bày các patterns và best practices.
1. Multiple QueueClient Instances (Simple)
Phù hợp cho small applications với vài queues.
public class MultiQueueService
{
private readonly QueueClient _ordersQueue;
private readonly QueueClient _notificationsQueue;
private readonly QueueClient _inventoryQueue;
public MultiQueueService(IConfiguration configuration)
{
var connectionString = configuration.GetValue<string>("AzureStorage");
_ordersQueue = new QueueClient(connectionString, "orders-queue");
_notificationsQueue = new QueueClient(connectionString, "notifications-queue");
_inventoryQueue = new QueueClient(connectionString, "inventory-queue");
// Create queues if not exist
_ordersQueue.CreateIfNotExists();
_notificationsQueue.CreateIfNotExists();
_inventoryQueue.CreateIfNotExists();
}
public async Task SendOrderAsync(Order order)
{
var message = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(order),
MessageId = Guid.NewGuid().ToString()
};
await _ordersQueue.SendMessageAsync(message);
}
public async Task SendNotificationAsync(Notification notification)
{
var message = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(notification)
};
await _notificationsQueue.SendMessageAsync(message);
}
public async Task SendInventoryUpdateAsync(InventoryUpdate update)
{
var message = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(update)
};
await _inventoryQueue.SendMessageAsync(message);
}
}
2. Queue Client Factory (Recommended for DI)
Interface + Factory pattern, tốt cho medium applications.
// Interface định nghĩa factory
public interface IQueueClientFactory
{
QueueClient GetQueueClient(string queueName);
}
// Implementation
public class QueueClientFactory : IQueueClientFactory
{
private readonly string _connectionString;
private readonly Dictionary<string, QueueClient> _clients = new();
public QueueClientFactory(string connectionString)
{
_connectionString = connectionString;
}
public QueueClient GetQueueClient(string queueName)
{
if (!_clients.TryGetValue(queueName, out var client))
{
client = new QueueClient(_connectionString, queueName);
client.CreateIfNotExists();
_clients[queueName] = client;
}
return client;
}
}
// Usage in services
public class OrderService
{
private readonly QueueClient _ordersQueue;
public OrderService(IQueueClientFactory queueFactory)
{
_ordersQueue = queueFactory.GetQueueClient("orders-queue");
}
public async Task SendOrderAsync(Order order)
{
await _ordersQueue.SendMessageAsync(new QueueMessage
{
Body = BinaryData.FromObjectAsJson(order)
});
}
}
public class NotificationService
{
private readonly QueueClient _notificationsQueue;
public NotificationService(IQueueClientFactory queueFactory)
{
_notificationsQueue = queueFactory.GetQueueClient("notifications-queue");
}
}
// DI Registration
public class Program
{
public static void Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);
var connectionString = builder.Configuration.GetValue<string>("AzureStorage");
builder.Services.AddSingleton<IQueueClientFactory>(
new QueueClientFactory(connectionString));
builder.Services.AddScoped<IOrderService, OrderService>();
builder.Services.AddScoped<INotificationService, NotificationService>();
var host = builder.Build();
host.Run();
}
}
3. Generic Queue Service
Generic service cho bất kỳ queue nào, tốt cho large applications.
public interface IGenericQueueService
{
Task SendAsync<T>(string queueName, T message);
Task<IEnumerable<QueueMessage>> ReceiveAsync(string queueName, int maxMessages = 32);
Task CompleteAsync(string queueName, QueueMessage message);
Task DeadLetterAsync(string queueName, QueueMessage message, string errorMessage);
}
public class GenericQueueService : IGenericQueueService
{
private readonly string _connectionString;
private readonly Dictionary<string, QueueClient> _clients = new();
public GenericQueueService(string connectionString)
{
_connectionString = connectionString;
}
private QueueClient GetOrCreateClient(string queueName)
{
if (!_clients.TryGetValue(queueName, out var client))
{
client = new QueueClient(_connectionString, queueName);
client.CreateIfNotExists();
_clients[queueName] = client;
}
return client;
}
public async Task SendAsync<T>(string queueName, T message)
{
var client = GetOrCreateClient(queueName);
var queueMessage = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(message),
MessageId = Guid.NewGuid().ToString(),
TimeToLive = TimeSpan.FromDays(7)
};
await client.SendMessageAsync(queueMessage);
}
public async Task<IEnumerable<QueueMessage>> ReceiveAsync(
string queueName,
int maxMessages = 32)
{
var client = GetOrCreateClient(queueName);
var response = await client.ReceiveMessagesAsync(maxMessages);
return response.Value;
}
public async Task CompleteAsync(string queueName, QueueMessage message)
{
var client = GetOrCreateClient(queueName);
await client.DeleteMessageAsync(message.MessageId, message.PopReceipt);
}
public async Task DeadLetterAsync(
string queueName,
QueueMessage message,
string errorMessage)
{
var deadLetterQueueName = $"{queueName}-deadletter";
var deadLetterClient = GetOrCreateClient(deadLetterQueueName);
var deadLetterMessage = new QueueMessage
{
Body = message.Body,
MessageId = message.MessageId,
TimeToLive = TimeSpan.FromDays(30),
Metadata = new Dictionary<string, string>
{
["OriginalQueue"] = queueName,
["FailedAt"] = DateTime.UtcNow.ToString("O"),
["ErrorMessage"] = errorMessage
}
};
await deadLetterClient.SendMessageAsync(deadLetterMessage);
await CompleteAsync(queueName, message);
}
}
// Usage
public class OrderService
{
private readonly IGenericQueueService _queueService;
public OrderService(IGenericQueueService queueService)
{
_queueService = queueService;
}
public async Task SendOrderAsync(Order order)
{
await _queueService.SendAsync("orders-queue", order);
}
}
4. Background Service for Multiple Queues
Xử lý nhiều queues trong một background service.
public class MultiQueueProcessor : BackgroundService
{
private readonly IConfiguration _configuration;
private readonly ILogger<MultiQueueProcessor> _logger;
private readonly Dictionary<string, Func<QueueMessage, Task>> _handlers;
public MultiQueueProcessor(
IConfiguration configuration,
ILogger<MultiQueueProcessor> logger)
{
_configuration = configuration;
_logger = logger;
// Define handlers for each queue
_handlers = new Dictionary<string, Func<QueueMessage, Task>>
{
["orders-queue"] = ProcessOrderMessageAsync,
["notifications-queue"] = ProcessNotificationMessageAsync,
["inventory-queue"] = ProcessInventoryMessageAsync
};
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionString = _configuration.GetValue<string>("AzureStorage");
// Create tasks for each queue
var tasks = _handlers.Select(async kvp =>
{
var queueName = kvp.Key;
var handler = kvp.Value;
var client = new QueueClient(connectionString, queueName);
while (!stoppingToken.IsCancellationRequested)
{
try
{
var messages = await client.ReceiveMessagesAsync(
maxMessages: 10,
visibilityTimeout: TimeSpan.FromMinutes(5),
cancellationToken: stoppingToken);
foreach (var message in messages.Value)
{
try
{
await handler(message);
await client.DeleteMessageAsync(
message.MessageId,
message.PopReceipt);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error processing message from {Queue}",
queueName);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error receiving from queue {Queue}", queueName);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
});
// Run all queue processors in parallel
await Task.WhenAll(tasks);
}
private async Task ProcessOrderMessageAsync(QueueMessage message)
{
var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
_logger.LogInformation("Processing order: {OrderId}", order.Id);
await Task.Delay(100);
}
private async Task ProcessNotificationMessageAsync(QueueMessage message)
{
var notification = JsonSerializer.Deserialize<Notification>(message.Body.ToString());
_logger.LogInformation("Sending notification: {Type}", notification.Type);
await Task.Delay(50);
}
private async Task ProcessInventoryMessageAsync(QueueMessage message)
{
var inventoryUpdate = JsonSerializer.Deserialize<InventoryUpdate>(message.Body.ToString());
_logger.LogInformation("Updating inventory: {ProductId}", inventoryUpdate.ProductId);
await Task.Delay(50);
}
}
5. Configuration-Based Queue Registration
Đăng ký queues từ configuration.
// appsettings.json
{
"AzureStorage": "DefaultEndpointsProtocol=https;...",
"Queues": {
"Orders": "orders-queue",
"Notifications": "notifications-queue",
"Inventory": "inventory-queue"
}
}
// Queue configuration class
public class QueueConfiguration
{
public string Orders { get; set; }
public string Notifications { get; set; }
public string Inventory { get; set; }
}
// Registration
public class Program
{
public static void Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);
var queueConfig = builder.Configuration.GetSection("Queues")
.Get<QueueConfiguration>();
// Register all queues as dictionary
builder.Services.AddSingleton(sp =>
{
var connectionString = builder.Configuration.GetValue<string>("AzureStorage");
return new Dictionary<string, QueueClient>
{
[queueConfig.Orders] = new QueueClient(connectionString, queueConfig.Orders),
[queueConfig.Notifications] = new QueueClient(connectionString, queueConfig.Notifications),
[queueConfig.Inventory] = new QueueClient(connectionString, queueConfig.Inventory)
};
});
// Or register by name
builder.Services.AddScoped<IOrderService>(sp =>
{
var queues = sp.GetRequiredService<Dictionary<string, QueueClient>>();
return new OrderService(queues[queueConfig.Orders]);
});
}
}
// Usage with IOptions
public class QueueService
{
private readonly QueueClient _ordersQueue;
public QueueService(IOptions<QueueConfiguration> config, IConfiguration configuration)
{
var connectionString = configuration.GetValue<string>("AzureStorage");
_ordersQueue = new QueueClient(connectionString, config.Value.Orders);
}
}
6. Queue-Specific Processors
Tạo processor riêng cho từng queue.
// Base processor interface
public interface IQueueProcessor
{
string QueueName { get; }
Task ProcessMessageAsync(QueueMessage message);
}
// Order processor
public class OrderQueueProcessor : IQueueProcessor
{
private readonly QueueClient _queueClient;
private readonly IOrderService _orderService;
public string QueueName => "orders-queue";
public OrderQueueProcessor(
QueueClient queueClient,
IOrderService orderService)
{
_queueClient = queueClient;
_orderService = orderService;
}
public async Task ProcessMessageAsync(QueueMessage message)
{
var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
await _orderService.ProcessAsync(order);
}
}
// Notification processor
public class NotificationQueueProcessor : IQueueProcessor
{
public string QueueName => "notifications-queue";
public async Task ProcessMessageAsync(QueueMessage message)
{
var notification = JsonSerializer.Deserialize<Notification>(message.Body.ToString());
// Send notification
}
}
// Hosted service that runs all processors
public class QueueProcessorHost : BackgroundService
{
private readonly IEnumerable<IQueueProcessor> _processors;
private readonly ILogger<QueueProcessorHost> _logger;
private readonly Dictionary<string, IQueueProcessor> _processorMap;
public QueueProcessorHost(
IEnumerable<IQueueProcessor> processors,
ILogger<QueueProcessorHost> logger)
{
_processors = processors;
_logger = logger;
_processorMap = processors.ToDictionary(p => p.QueueName);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var tasks = _processors.Select(p => ProcessQueueAsync(p, stoppingToken));
await Task.WhenAll(tasks);
}
private async Task ProcessQueueAsync(
IQueueProcessor processor,
CancellationToken ct)
{
var connectionString = Environment.GetEnvironmentVariable("AzureStorage");
var client = new QueueClient(connectionString, processor.QueueName);
while (!ct.IsCancellationRequested)
{
try
{
var messages = await client.ReceiveMessagesAsync(
maxMessages: 10,
visibilityTimeout: TimeSpan.FromMinutes(5),
cancellationToken: ct);
foreach (var message in messages.Value)
{
try
{
await processor.ProcessMessageAsync(message);
await client.DeleteMessageAsync(
message.MessageId,
message.PopReceipt);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing {Queue}", processor.QueueName);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error receiving from {Queue}", processor.QueueName);
await Task.Delay(TimeSpan.FromSeconds(5), ct);
}
}
}
}
Summary Comparison
| Approach | Best For | Pros | Cons |
|---|---|---|---|
| Multiple instances | Small apps | Simple, direct | Repetitive code |
| Factory | Medium apps | DI-friendly, reusable | More setup |
| Generic service | Large apps | Centralized, flexible | Slightly complex |
| Background service | Processing | Handles multiple queues | Single process |
| Config-based | Config-driven | Easy to change | Less type-safe |
| Queue-specific processors | Complex apps | Separation of concerns | More interfaces |
Next Steps
- Azure Service Bus Topics - Pub/Sub patterns
- Introduction - Back to overview