Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Multiple Queues Management

Overview

Khi ứng dụng cần làm việc với nhiều queue khác nhau (ví dụ: orders-queue, notifications-queue, inventory-queue), có một số approaches để quản lý. Phần này trình bày các patterns và best practices.

1. Multiple QueueClient Instances (Simple)

Phù hợp cho small applications với vài queues.

public class MultiQueueService
{
    private readonly QueueClient _ordersQueue;
    private readonly QueueClient _notificationsQueue;
    private readonly QueueClient _inventoryQueue;
    
    public MultiQueueService(IConfiguration configuration)
    {
        var connectionString = configuration.GetValue<string>("AzureStorage");
        
        _ordersQueue = new QueueClient(connectionString, "orders-queue");
        _notificationsQueue = new QueueClient(connectionString, "notifications-queue");
        _inventoryQueue = new QueueClient(connectionString, "inventory-queue");
        
        // Create queues if not exist
        _ordersQueue.CreateIfNotExists();
        _notificationsQueue.CreateIfNotExists();
        _inventoryQueue.CreateIfNotExists();
    }
    
    public async Task SendOrderAsync(Order order)
    {
        var message = new QueueMessage
        {
            Body = BinaryData.FromObjectAsJson(order),
            MessageId = Guid.NewGuid().ToString()
        };
        await _ordersQueue.SendMessageAsync(message);
    }
    
    public async Task SendNotificationAsync(Notification notification)
    {
        var message = new QueueMessage
        {
            Body = BinaryData.FromObjectAsJson(notification)
        };
        await _notificationsQueue.SendMessageAsync(message);
    }
    
    public async Task SendInventoryUpdateAsync(InventoryUpdate update)
    {
        var message = new QueueMessage
        {
            Body = BinaryData.FromObjectAsJson(update)
        };
        await _inventoryQueue.SendMessageAsync(message);
    }
}

Interface + Factory pattern, tốt cho medium applications.

// Interface định nghĩa factory
public interface IQueueClientFactory
{
    QueueClient GetQueueClient(string queueName);
}

// Implementation
public class QueueClientFactory : IQueueClientFactory
{
    private readonly string _connectionString;
    private readonly Dictionary<string, QueueClient> _clients = new();
    
    public QueueClientFactory(string connectionString)
    {
        _connectionString = connectionString;
    }
    
    public QueueClient GetQueueClient(string queueName)
    {
        if (!_clients.TryGetValue(queueName, out var client))
        {
            client = new QueueClient(_connectionString, queueName);
            client.CreateIfNotExists();
            _clients[queueName] = client;
        }
        
        return client;
    }
}

// Usage in services
public class OrderService
{
    private readonly QueueClient _ordersQueue;
    
    public OrderService(IQueueClientFactory queueFactory)
    {
        _ordersQueue = queueFactory.GetQueueClient("orders-queue");
    }
    
    public async Task SendOrderAsync(Order order)
    {
        await _ordersQueue.SendMessageAsync(new QueueMessage
        {
            Body = BinaryData.FromObjectAsJson(order)
        });
    }
}

public class NotificationService
{
    private readonly QueueClient _notificationsQueue;
    
    public NotificationService(IQueueClientFactory queueFactory)
    {
        _notificationsQueue = queueFactory.GetQueueClient("notifications-queue");
    }
}

// DI Registration
public class Program
{
    public static void Main(string[] args)
    {
        var builder = Host.CreateApplicationBuilder(args);
        
        var connectionString = builder.Configuration.GetValue<string>("AzureStorage");
        builder.Services.AddSingleton<IQueueClientFactory>(
            new QueueClientFactory(connectionString));
        
        builder.Services.AddScoped<IOrderService, OrderService>();
        builder.Services.AddScoped<INotificationService, NotificationService>();
        
        var host = builder.Build();
        host.Run();
    }
}

3. Generic Queue Service

Generic service cho bất kỳ queue nào, tốt cho large applications.

public interface IGenericQueueService
{
    Task SendAsync<T>(string queueName, T message);
    Task<IEnumerable<QueueMessage>> ReceiveAsync(string queueName, int maxMessages = 32);
    Task CompleteAsync(string queueName, QueueMessage message);
    Task DeadLetterAsync(string queueName, QueueMessage message, string errorMessage);
}

public class GenericQueueService : IGenericQueueService
{
    private readonly string _connectionString;
    private readonly Dictionary<string, QueueClient> _clients = new();
    
    public GenericQueueService(string connectionString)
    {
        _connectionString = connectionString;
    }
    
    private QueueClient GetOrCreateClient(string queueName)
    {
        if (!_clients.TryGetValue(queueName, out var client))
        {
            client = new QueueClient(_connectionString, queueName);
            client.CreateIfNotExists();
            _clients[queueName] = client;
        }
        return client;
    }
    
    public async Task SendAsync<T>(string queueName, T message)
    {
        var client = GetOrCreateClient(queueName);
        
        var queueMessage = new QueueMessage
        {
            Body = BinaryData.FromObjectAsJson(message),
            MessageId = Guid.NewGuid().ToString(),
            TimeToLive = TimeSpan.FromDays(7)
        };
        
        await client.SendMessageAsync(queueMessage);
    }
    
    public async Task<IEnumerable<QueueMessage>> ReceiveAsync(
        string queueName, 
        int maxMessages = 32)
    {
        var client = GetOrCreateClient(queueName);
        var response = await client.ReceiveMessagesAsync(maxMessages);
        return response.Value;
    }
    
    public async Task CompleteAsync(string queueName, QueueMessage message)
    {
        var client = GetOrCreateClient(queueName);
        await client.DeleteMessageAsync(message.MessageId, message.PopReceipt);
    }
    
    public async Task DeadLetterAsync(
        string queueName, 
        QueueMessage message, 
        string errorMessage)
    {
        var deadLetterQueueName = $"{queueName}-deadletter";
        var deadLetterClient = GetOrCreateClient(deadLetterQueueName);
        
        var deadLetterMessage = new QueueMessage
        {
            Body = message.Body,
            MessageId = message.MessageId,
            TimeToLive = TimeSpan.FromDays(30),
            Metadata = new Dictionary<string, string>
            {
                ["OriginalQueue"] = queueName,
                ["FailedAt"] = DateTime.UtcNow.ToString("O"),
                ["ErrorMessage"] = errorMessage
            }
        };
        
        await deadLetterClient.SendMessageAsync(deadLetterMessage);
        await CompleteAsync(queueName, message);
    }
}

// Usage
public class OrderService
{
    private readonly IGenericQueueService _queueService;
    
    public OrderService(IGenericQueueService queueService)
    {
        _queueService = queueService;
    }
    
    public async Task SendOrderAsync(Order order)
    {
        await _queueService.SendAsync("orders-queue", order);
    }
}

4. Background Service for Multiple Queues

Xử lý nhiều queues trong một background service.

public class MultiQueueProcessor : BackgroundService
{
    private readonly IConfiguration _configuration;
    private readonly ILogger<MultiQueueProcessor> _logger;
    private readonly Dictionary<string, Func<QueueMessage, Task>> _handlers;
    
    public MultiQueueProcessor(
        IConfiguration configuration,
        ILogger<MultiQueueProcessor> logger)
    {
        _configuration = configuration;
        _logger = logger;
        
        // Define handlers for each queue
        _handlers = new Dictionary<string, Func<QueueMessage, Task>>
        {
            ["orders-queue"] = ProcessOrderMessageAsync,
            ["notifications-queue"] = ProcessNotificationMessageAsync,
            ["inventory-queue"] = ProcessInventoryMessageAsync
        };
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var connectionString = _configuration.GetValue<string>("AzureStorage");
        
        // Create tasks for each queue
        var tasks = _handlers.Select(async kvp =>
        {
            var queueName = kvp.Key;
            var handler = kvp.Value;
            
            var client = new QueueClient(connectionString, queueName);
            
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    var messages = await client.ReceiveMessagesAsync(
                        maxMessages: 10,
                        visibilityTimeout: TimeSpan.FromMinutes(5),
                        cancellationToken: stoppingToken);
                    
                    foreach (var message in messages.Value)
                    {
                        try
                        {
                            await handler(message);
                            await client.DeleteMessageAsync(
                                message.MessageId, 
                                message.PopReceipt);
                        }
                        catch (Exception ex)
                        {
                            _logger.LogError(ex, 
                                "Error processing message from {Queue}", 
                                queueName);
                        }
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error receiving from queue {Queue}", queueName);
                    await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
                }
            }
        });
        
        // Run all queue processors in parallel
        await Task.WhenAll(tasks);
    }
    
    private async Task ProcessOrderMessageAsync(QueueMessage message)
    {
        var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
        _logger.LogInformation("Processing order: {OrderId}", order.Id);
        await Task.Delay(100);
    }
    
    private async Task ProcessNotificationMessageAsync(QueueMessage message)
    {
        var notification = JsonSerializer.Deserialize<Notification>(message.Body.ToString());
        _logger.LogInformation("Sending notification: {Type}", notification.Type);
        await Task.Delay(50);
    }
    
    private async Task ProcessInventoryMessageAsync(QueueMessage message)
    {
        var inventoryUpdate = JsonSerializer.Deserialize<InventoryUpdate>(message.Body.ToString());
        _logger.LogInformation("Updating inventory: {ProductId}", inventoryUpdate.ProductId);
        await Task.Delay(50);
    }
}

5. Configuration-Based Queue Registration

Đăng ký queues từ configuration.

// appsettings.json
{
  "AzureStorage": "DefaultEndpointsProtocol=https;...",
  "Queues": {
    "Orders": "orders-queue",
    "Notifications": "notifications-queue",
    "Inventory": "inventory-queue"
  }
}

// Queue configuration class
public class QueueConfiguration
{
    public string Orders { get; set; }
    public string Notifications { get; set; }
    public string Inventory { get; set; }
}

// Registration
public class Program
{
    public static void Main(string[] args)
    {
        var builder = Host.CreateApplicationBuilder(args);
        
        var queueConfig = builder.Configuration.GetSection("Queues")
            .Get<QueueConfiguration>();
        
        // Register all queues as dictionary
        builder.Services.AddSingleton(sp =>
        {
            var connectionString = builder.Configuration.GetValue<string>("AzureStorage");
            return new Dictionary<string, QueueClient>
            {
                [queueConfig.Orders] = new QueueClient(connectionString, queueConfig.Orders),
                [queueConfig.Notifications] = new QueueClient(connectionString, queueConfig.Notifications),
                [queueConfig.Inventory] = new QueueClient(connectionString, queueConfig.Inventory)
            };
        });
        
        // Or register by name
        builder.Services.AddScoped<IOrderService>(sp =>
        {
            var queues = sp.GetRequiredService<Dictionary<string, QueueClient>>();
            return new OrderService(queues[queueConfig.Orders]);
        });
    }
}

// Usage with IOptions
public class QueueService
{
    private readonly QueueClient _ordersQueue;
    
    public QueueService(IOptions<QueueConfiguration> config, IConfiguration configuration)
    {
        var connectionString = configuration.GetValue<string>("AzureStorage");
        _ordersQueue = new QueueClient(connectionString, config.Value.Orders);
    }
}

6. Queue-Specific Processors

Tạo processor riêng cho từng queue.

// Base processor interface
public interface IQueueProcessor
{
    string QueueName { get; }
    Task ProcessMessageAsync(QueueMessage message);
}

// Order processor
public class OrderQueueProcessor : IQueueProcessor
{
    private readonly QueueClient _queueClient;
    private readonly IOrderService _orderService;
    
    public string QueueName => "orders-queue";
    
    public OrderQueueProcessor(
        QueueClient queueClient, 
        IOrderService orderService)
    {
        _queueClient = queueClient;
        _orderService = orderService;
    }
    
    public async Task ProcessMessageAsync(QueueMessage message)
    {
        var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
        await _orderService.ProcessAsync(order);
    }
}

// Notification processor
public class NotificationQueueProcessor : IQueueProcessor
{
    public string QueueName => "notifications-queue";
    
    public async Task ProcessMessageAsync(QueueMessage message)
    {
        var notification = JsonSerializer.Deserialize<Notification>(message.Body.ToString());
        // Send notification
    }
}

// Hosted service that runs all processors
public class QueueProcessorHost : BackgroundService
{
    private readonly IEnumerable<IQueueProcessor> _processors;
    private readonly ILogger<QueueProcessorHost> _logger;
    private readonly Dictionary<string, IQueueProcessor> _processorMap;
    
    public QueueProcessorHost(
        IEnumerable<IQueueProcessor> processors,
        ILogger<QueueProcessorHost> logger)
    {
        _processors = processors;
        _logger = logger;
        _processorMap = processors.ToDictionary(p => p.QueueName);
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var tasks = _processors.Select(p => ProcessQueueAsync(p, stoppingToken));
        await Task.WhenAll(tasks);
    }
    
    private async Task ProcessQueueAsync(
        IQueueProcessor processor, 
        CancellationToken ct)
    {
        var connectionString = Environment.GetEnvironmentVariable("AzureStorage");
        var client = new QueueClient(connectionString, processor.QueueName);
        
        while (!ct.IsCancellationRequested)
        {
            try
            {
                var messages = await client.ReceiveMessagesAsync(
                    maxMessages: 10,
                    visibilityTimeout: TimeSpan.FromMinutes(5),
                    cancellationToken: ct);
                
                foreach (var message in messages.Value)
                {
                    try
                    {
                        await processor.ProcessMessageAsync(message);
                        await client.DeleteMessageAsync(
                            message.MessageId, 
                            message.PopReceipt);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Error processing {Queue}", processor.QueueName);
                    }
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error receiving from {Queue}", processor.QueueName);
                await Task.Delay(TimeSpan.FromSeconds(5), ct);
            }
        }
    }
}

Summary Comparison

ApproachBest ForProsCons
Multiple instancesSmall appsSimple, directRepetitive code
FactoryMedium appsDI-friendly, reusableMore setup
Generic serviceLarge appsCentralized, flexibleSlightly complex
Background serviceProcessingHandles multiple queuesSingle process
Config-basedConfig-drivenEasy to changeLess type-safe
Queue-specific processorsComplex appsSeparation of concernsMore interfaces

Next Steps

References