Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Queue Best Practices

Overview

Khi làm việc với message queues, việc xử lý errors, retries, và dead letters là rất quan trọng để đảm bảo reliability của hệ thống. Phần này trình bày các best practices để handle failures một cách graceful.

Retry Pattern with Exponential Backoff

public class ReliableQueueProcessor
{
    private readonly QueueClient _queueClient;
    private readonly ILogger<ReliableQueueProcessor> _logger;
    private readonly int MaxRetries = 3;
    
    public ReliableQueueProcessor(
        QueueClient queueClient, 
        ILogger<ReliableQueueProcessor> logger)
    {
        _queueClient = queueClient;
        _logger = logger;
    }
    
    public async Task ProcessWithRetryAsync()
    {
        var messages = await _queueClient.ReceiveMessagesAsync(
            maxMessages: 10,
            visibilityTimeout: TimeSpan.FromMinutes(5));
        
        foreach (var message in messages.Value)
        {
            var attempt = 0;
            var success = false;
            
            while (attempt < MaxRetries && !success)
            {
                try
                {
                    await ProcessMessageAsync(message);
                    
                    // Success - delete message
                    await _queueClient.DeleteMessageAsync(
                        message.MessageId, 
                        message.PopReceipt);
                    
                    success = true;
                }
                catch (Exception ex)
                {
                    attempt++;
                    _logger.LogWarning(
                        ex, 
                        "Attempt {Attempt} failed for message {MessageId}. Retrying...",
                        attempt,
                        message.MessageId);
                    
                    if (attempt < MaxRetries)
                    {
                        // Exponential backoff
                        var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt));
                        await Task.Delay(delay);
                    }
                    else
                    {
                        // All retries failed - send to dead letter
                        await SendToDeadLetterAsync(message, ex);
                    }
                }
            }
        }
    }
    
    private async Task ProcessMessageAsync(QueueMessage message)
    {
        var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
        
        // Processing logic
        await Task.Delay(100);
        
        // Simulate potential failure
        if (order.TotalAmount > 10000)
            throw new Exception("High value order requires manual approval");
    }
}

Dead Letter Queue

Setting Up Dead Letter Queue

// Create dead letter queue
public async Task SetupDeadLetterQueueAsync()
{
    var connectionString = "DefaultEndpointsProtocol=https;...";
    var queueName = "orders-queue";
    
    var queueClient = new QueueClient(connectionString, queueName);
    await queueClient.CreateIfNotExistsAsync();
    
    // Create dead letter queue
    var deadLetterQueueName = $"{queueName}-deadletter";
    var deadLetterClient = new QueueClient(connectionString, deadLetterQueueName);
    await deadLetterClient.CreateIfNotExistsAsync();
}

Sending to Dead Letter

public class DeadLetterHandler
{
    private readonly QueueClient _queueClient;
    
    public async Task SendToDeadLetterAsync(QueueMessage message, Exception ex)
    {
        var deadLetterQueueName = $"{_queueClient.Name}-deadletter";
        var deadLetterClient = new QueueClient(_queueClient.ConnectionString, deadLetterQueueName);
        
        await deadLetterClient.CreateIfNotExistsAsync();
        
        var deadLetterMessage = new QueueMessage
        {
            Body = message.Body,
            MessageId = message.MessageId,
            TimeToLive = TimeSpan.FromDays(30),
            // Add metadata about the failure
            Metadata = new Dictionary<string, string>
            {
                ["OriginalQueue"] = _queueClient.Name,
                ["FailedAt"] = DateTime.UtcNow.ToString("O"),
                ["ErrorMessage"] = ex.Message,
                ["RetryCount"] = "3"
            }
        };
        
        await deadLetterClient.SendMessageAsync(deadLetterMessage);
        
        // Delete from original queue
        await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
    }
}

Poison Message Handling

Detecting Poison Messages

public class PoisonMessageHandler
{
    private readonly QueueClient _queueClient;
    private readonly ILogger<PoisonMessageHandler> _logger;
    private const int MaxDequeueCount = 5;
    
    public async Task HandlePoisonMessagesAsync()
    {
        var messages = await _queueClient.ReceiveMessagesAsync(maxMessages: 10);
        
        foreach (var message in messages.Value)
        {
            // Get message properties to check dequeue count
            var properties = await _queueClient.GetMessageAsync(message.MessageId);
            
            // If message has been tried too many times, it's a poison message
            if (properties.Value.DequeueCount > MaxDequeueCount)
            {
                await HandlePoisonMessageAsync(message);
            }
            else
            {
                // Normal processing
                await ProcessMessageAsync(message);
            }
        }
    }
    
    private async Task HandlePoisonMessageAsync(QueueMessage message)
    {
        _logger.LogError(
            "Poison message detected: {MessageId}, DequeueCount: {DequeueCount}",
            message.MessageId,
            message.DequeueCount);
        
        // 1. Log the message content for investigation
        _logger.LogInformation("Poison message content: {Body}", message.Body.ToString());
        
        // 2. Archive to blob storage for analysis
        await ArchiveMessageAsync(message);
        
        // 3. Delete from queue
        await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
        
        // 4. Notify operations team
        await NotifyOperationsAsync(message);
    }
    
    private async Task ArchiveMessageAsync(QueueMessage message)
    {
        var blobClient = new BlobClient(
            "DefaultEndpointsProtocol=https;AccountName=...;AccountKey==",
            "poison-messages",
            $"poison-{message.MessageId}-{DateTime.UtcNow:yyyyMMddHHmmss}.json");
        
        await blobClient.UploadAsync(new BinaryData(message.Body));
    }
    
    private async Task NotifyOperationsAsync(QueueMessage message)
    {
        _logger.LogError("ALERT: Poison message requires attention");
    }
}

Circuit Breaker Pattern

public class CircuitBreakerQueueProcessor
{
    private readonly QueueClient _queueClient;
    private readonly ILogger<CircuitBreakerQueueProcessor> _logger;
    
    private int _failureCount = 0;
    private readonly int _failureThreshold = 5;
    private readonly TimeSpan _circuitOpenDuration = TimeSpan.FromMinutes(1);
    private DateTime _circuitOpenedAt = DateTime.MinValue;
    
    public CircuitBreakerQueueProcessor(
        QueueClient queueClient, 
        ILogger<CircuitBreakerQueueProcessor> logger)
    {
        _queueClient = queueClient;
        _logger = logger;
    }
    
    public async Task ProcessMessagesAsync()
    {
        // Check if circuit is open
        if (IsCircuitOpen())
        {
            _logger.LogWarning("Circuit breaker is open. Waiting before retry...");
            return;
        }
        
        try
        {
            var messages = await _queueClient.ReceiveMessagesAsync(maxMessages: 10);
            
            foreach (var message in messages.Value)
            {
                await ProcessMessageAsync(message);
                _failureCount = 0;  // Reset on success
            }
        }
        catch (Exception ex)
        {
            _failureCount++;
            _logger.LogError(ex, 
                "Circuit breaker: Failure count {Count}/{Threshold}", 
                _failureCount, _failureThreshold);
            
            if (_failureCount >= _failureThreshold)
            {
                _circuitOpenedAt = DateTime.UtcNow;
                _logger.LogError("Circuit breaker opened due to repeated failures");
            }
            
            throw;
        }
    }
    
    private bool IsCircuitOpen()
    {
        if (_circuitOpenedAt == DateTime.MinValue)
            return false;
        
        if (DateTime.UtcNow - _circuitOpenedAt > _circuitOpenDuration)
        {
            // Try to close the circuit
            _circuitOpenedAt = DateTime.MinValue;
            _failureCount = 0;
            _logger.LogInformation("Circuit breaker closed, resuming normal operation");
            return false;
        }
        
        return true;
    }
}

Idempotency

Luôn thiết kế để xử lý idempotent - xử lý cùng message nhiều lần không gây ra side effects không mong muốn.

public class IdempotentOrderProcessor
{
    private readonly IRedisCache _redis;
    private readonly IOrderService _orderService;
    
    public async Task ProcessOrderAsync(QueueMessage message)
    {
        var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
        
        // Check if already processed using message ID or business key
        var isProcessed = await _redis.SetAddAsync(
            $"order:{order.Id}:processed",
            message.MessageId,
            TimeSpan.FromDays(1));
        
        if (!isProcessed)
        {
            // Already processed - skip
            return;
        }
        
        // Process the order
        await _orderService.ProcessAsync(order);
    }
}

// Alternative: Use database to track processed messages
public class DatabaseIdempotentProcessor
{
    private readonly AppDbContext _context;
    
    public async Task<bool> IsMessageProcessedAsync(string messageId)
    {
        return await _context.ProcessedMessages
            .AnyAsync(m => m.MessageId == messageId);
    }
    
    public async Task MarkAsProcessedAsync(string messageId)
    {
        _context.ProcessedMessages.Add(new ProcessedMessage
        {
            MessageId = messageId,
            ProcessedAt = DateTime.UtcNow
        });
        await _context.SaveChangesAsync();
    }
}

Error Handling Strategies

Strategy 1: Retry with Delay

public async Task ProcessWithRetryAsync(QueueMessage message)
{
    const int maxRetries = 3;
    
    for (int i = 0; i < maxRetries; i++)
    {
        try
        {
            await ProcessMessageAsync(message);
            return;
        }
        catch (Exception ex) when (i < maxRetries - 1)
        {
            await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, i)));
        }
    }
    
    // Final attempt failed - send to dead letter
    await SendToDeadLetterAsync(message, ex);
}

Strategy 2: Move to Backoff Queue

public async Task MoveToBackoffQueueAsync(QueueMessage message)
{
    var backoffQueue = new QueueClient(
        _queueClient.ConnectionString, 
        $"{_queueClient.Name}-backoff");
    
    await backoffQueue.CreateIfNotExistsAsync();
    
    var backoffMessage = new QueueMessage
    {
        Body = message.Body,
        MessageId = message.MessageId,
        TimeToLive = TimeSpan.FromMinutes(30),  // Short TTL for retry
        VisibilityTimeout = TimeSpan.FromMinutes(5)  // Wait before retry
    };
    
    await backoffQueue.SendMessageAsync(backoffMessage);
    await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
}

Strategy 3: Partial Processing

public async Task<bool> ProcessWithPartialSuccessAsync(QueueMessage message)
{
    var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
    
    try
    {
        // Process what we can
        await _orderService.ValidateOrderAsync(order);
        await _inventoryService.ReserveItemsAsync(order);
    }
    catch (InventoryException ex)
    {
        // Not enough inventory - handle gracefully
        await _notificationService.NotifyCustomerAsync(
            order.CustomerEmail,
            "Some items are out of stock");
        
        // Continue with available items
        order.Items = order.Items.Where(i => i.IsAvailable).ToList();
    }
    
    try
    {
        await _paymentService.ProcessPaymentAsync(order);
    }
    catch (PaymentException ex)
    {
        // Payment failed - retry later
        throw;
    }
    
    return true;
}

Monitoring and Alerting

public class QueueMetrics
{
    private readonly ILogger<QueueMetrics> _logger;
    private readonly QueueClient _queueClient;
    
    public async Task LogQueueMetricsAsync()
    {
        var properties = await _queueClient.GetPropertiesAsync();
        
        _logger.LogInformation(
            "Queue: {Queue}, Messages: {Count}, Created: {Created}",
            _queueClient.Name,
            properties.Value.ApproximateMessageCount,
            properties.Value.CreatedOn);
    }
}

// Health check for queue processing
public class QueueHealthCheck : IHealthCheck
{
    private readonly QueueClient _queueClient;
    
    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context, 
        CancellationToken cancellationToken)
    {
        try
        {
            var properties = await _queueClient.GetPropertiesAsync(cancellationToken);
            
            if (properties.Value.ApproximateMessageCount > 10000)
            {
                return HealthCheckResult.Degraded(
                    "Queue has large number of messages");
            }
            
            return HealthCheckResult.Healthy("Queue is healthy");
        }
        catch (Exception ex)
        {
            return HealthCheckResult.Unhealthy("Queue is not accessible", ex);
        }
    }
}

Summary

PracticeDescription
Exponential BackoffIncrease delay between retries
Dead Letter QueueStore failed messages for investigation
Poison Message HandlingDetect and archive repeatedly failing messages
Circuit BreakerStop processing when failures are too frequent
IdempotencyEnsure message can be processed multiple times safely
MonitoringTrack queue health and alert on issues

Next Steps

References