Queue Best Practices
Overview
Khi làm việc với message queues, việc xử lý errors, retries, và dead letters là rất quan trọng để đảm bảo reliability của hệ thống. Phần này trình bày các best practices để handle failures một cách graceful.
Retry Pattern with Exponential Backoff
public class ReliableQueueProcessor
{
private readonly QueueClient _queueClient;
private readonly ILogger<ReliableQueueProcessor> _logger;
private readonly int MaxRetries = 3;
public ReliableQueueProcessor(
QueueClient queueClient,
ILogger<ReliableQueueProcessor> logger)
{
_queueClient = queueClient;
_logger = logger;
}
public async Task ProcessWithRetryAsync()
{
var messages = await _queueClient.ReceiveMessagesAsync(
maxMessages: 10,
visibilityTimeout: TimeSpan.FromMinutes(5));
foreach (var message in messages.Value)
{
var attempt = 0;
var success = false;
while (attempt < MaxRetries && !success)
{
try
{
await ProcessMessageAsync(message);
// Success - delete message
await _queueClient.DeleteMessageAsync(
message.MessageId,
message.PopReceipt);
success = true;
}
catch (Exception ex)
{
attempt++;
_logger.LogWarning(
ex,
"Attempt {Attempt} failed for message {MessageId}. Retrying...",
attempt,
message.MessageId);
if (attempt < MaxRetries)
{
// Exponential backoff
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt));
await Task.Delay(delay);
}
else
{
// All retries failed - send to dead letter
await SendToDeadLetterAsync(message, ex);
}
}
}
}
}
private async Task ProcessMessageAsync(QueueMessage message)
{
var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
// Processing logic
await Task.Delay(100);
// Simulate potential failure
if (order.TotalAmount > 10000)
throw new Exception("High value order requires manual approval");
}
}
Dead Letter Queue
Setting Up Dead Letter Queue
// Create dead letter queue
public async Task SetupDeadLetterQueueAsync()
{
var connectionString = "DefaultEndpointsProtocol=https;...";
var queueName = "orders-queue";
var queueClient = new QueueClient(connectionString, queueName);
await queueClient.CreateIfNotExistsAsync();
// Create dead letter queue
var deadLetterQueueName = $"{queueName}-deadletter";
var deadLetterClient = new QueueClient(connectionString, deadLetterQueueName);
await deadLetterClient.CreateIfNotExistsAsync();
}
Sending to Dead Letter
public class DeadLetterHandler
{
private readonly QueueClient _queueClient;
public async Task SendToDeadLetterAsync(QueueMessage message, Exception ex)
{
var deadLetterQueueName = $"{_queueClient.Name}-deadletter";
var deadLetterClient = new QueueClient(_queueClient.ConnectionString, deadLetterQueueName);
await deadLetterClient.CreateIfNotExistsAsync();
var deadLetterMessage = new QueueMessage
{
Body = message.Body,
MessageId = message.MessageId,
TimeToLive = TimeSpan.FromDays(30),
// Add metadata about the failure
Metadata = new Dictionary<string, string>
{
["OriginalQueue"] = _queueClient.Name,
["FailedAt"] = DateTime.UtcNow.ToString("O"),
["ErrorMessage"] = ex.Message,
["RetryCount"] = "3"
}
};
await deadLetterClient.SendMessageAsync(deadLetterMessage);
// Delete from original queue
await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
}
}
Poison Message Handling
Detecting Poison Messages
public class PoisonMessageHandler
{
private readonly QueueClient _queueClient;
private readonly ILogger<PoisonMessageHandler> _logger;
private const int MaxDequeueCount = 5;
public async Task HandlePoisonMessagesAsync()
{
var messages = await _queueClient.ReceiveMessagesAsync(maxMessages: 10);
foreach (var message in messages.Value)
{
// Get message properties to check dequeue count
var properties = await _queueClient.GetMessageAsync(message.MessageId);
// If message has been tried too many times, it's a poison message
if (properties.Value.DequeueCount > MaxDequeueCount)
{
await HandlePoisonMessageAsync(message);
}
else
{
// Normal processing
await ProcessMessageAsync(message);
}
}
}
private async Task HandlePoisonMessageAsync(QueueMessage message)
{
_logger.LogError(
"Poison message detected: {MessageId}, DequeueCount: {DequeueCount}",
message.MessageId,
message.DequeueCount);
// 1. Log the message content for investigation
_logger.LogInformation("Poison message content: {Body}", message.Body.ToString());
// 2. Archive to blob storage for analysis
await ArchiveMessageAsync(message);
// 3. Delete from queue
await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
// 4. Notify operations team
await NotifyOperationsAsync(message);
}
private async Task ArchiveMessageAsync(QueueMessage message)
{
var blobClient = new BlobClient(
"DefaultEndpointsProtocol=https;AccountName=...;AccountKey==",
"poison-messages",
$"poison-{message.MessageId}-{DateTime.UtcNow:yyyyMMddHHmmss}.json");
await blobClient.UploadAsync(new BinaryData(message.Body));
}
private async Task NotifyOperationsAsync(QueueMessage message)
{
_logger.LogError("ALERT: Poison message requires attention");
}
}
Circuit Breaker Pattern
public class CircuitBreakerQueueProcessor
{
private readonly QueueClient _queueClient;
private readonly ILogger<CircuitBreakerQueueProcessor> _logger;
private int _failureCount = 0;
private readonly int _failureThreshold = 5;
private readonly TimeSpan _circuitOpenDuration = TimeSpan.FromMinutes(1);
private DateTime _circuitOpenedAt = DateTime.MinValue;
public CircuitBreakerQueueProcessor(
QueueClient queueClient,
ILogger<CircuitBreakerQueueProcessor> logger)
{
_queueClient = queueClient;
_logger = logger;
}
public async Task ProcessMessagesAsync()
{
// Check if circuit is open
if (IsCircuitOpen())
{
_logger.LogWarning("Circuit breaker is open. Waiting before retry...");
return;
}
try
{
var messages = await _queueClient.ReceiveMessagesAsync(maxMessages: 10);
foreach (var message in messages.Value)
{
await ProcessMessageAsync(message);
_failureCount = 0; // Reset on success
}
}
catch (Exception ex)
{
_failureCount++;
_logger.LogError(ex,
"Circuit breaker: Failure count {Count}/{Threshold}",
_failureCount, _failureThreshold);
if (_failureCount >= _failureThreshold)
{
_circuitOpenedAt = DateTime.UtcNow;
_logger.LogError("Circuit breaker opened due to repeated failures");
}
throw;
}
}
private bool IsCircuitOpen()
{
if (_circuitOpenedAt == DateTime.MinValue)
return false;
if (DateTime.UtcNow - _circuitOpenedAt > _circuitOpenDuration)
{
// Try to close the circuit
_circuitOpenedAt = DateTime.MinValue;
_failureCount = 0;
_logger.LogInformation("Circuit breaker closed, resuming normal operation");
return false;
}
return true;
}
}
Idempotency
Luôn thiết kế để xử lý idempotent - xử lý cùng message nhiều lần không gây ra side effects không mong muốn.
public class IdempotentOrderProcessor
{
private readonly IRedisCache _redis;
private readonly IOrderService _orderService;
public async Task ProcessOrderAsync(QueueMessage message)
{
var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
// Check if already processed using message ID or business key
var isProcessed = await _redis.SetAddAsync(
$"order:{order.Id}:processed",
message.MessageId,
TimeSpan.FromDays(1));
if (!isProcessed)
{
// Already processed - skip
return;
}
// Process the order
await _orderService.ProcessAsync(order);
}
}
// Alternative: Use database to track processed messages
public class DatabaseIdempotentProcessor
{
private readonly AppDbContext _context;
public async Task<bool> IsMessageProcessedAsync(string messageId)
{
return await _context.ProcessedMessages
.AnyAsync(m => m.MessageId == messageId);
}
public async Task MarkAsProcessedAsync(string messageId)
{
_context.ProcessedMessages.Add(new ProcessedMessage
{
MessageId = messageId,
ProcessedAt = DateTime.UtcNow
});
await _context.SaveChangesAsync();
}
}
Error Handling Strategies
Strategy 1: Retry with Delay
public async Task ProcessWithRetryAsync(QueueMessage message)
{
const int maxRetries = 3;
for (int i = 0; i < maxRetries; i++)
{
try
{
await ProcessMessageAsync(message);
return;
}
catch (Exception ex) when (i < maxRetries - 1)
{
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, i)));
}
}
// Final attempt failed - send to dead letter
await SendToDeadLetterAsync(message, ex);
}
Strategy 2: Move to Backoff Queue
public async Task MoveToBackoffQueueAsync(QueueMessage message)
{
var backoffQueue = new QueueClient(
_queueClient.ConnectionString,
$"{_queueClient.Name}-backoff");
await backoffQueue.CreateIfNotExistsAsync();
var backoffMessage = new QueueMessage
{
Body = message.Body,
MessageId = message.MessageId,
TimeToLive = TimeSpan.FromMinutes(30), // Short TTL for retry
VisibilityTimeout = TimeSpan.FromMinutes(5) // Wait before retry
};
await backoffQueue.SendMessageAsync(backoffMessage);
await _queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
}
Strategy 3: Partial Processing
public async Task<bool> ProcessWithPartialSuccessAsync(QueueMessage message)
{
var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
try
{
// Process what we can
await _orderService.ValidateOrderAsync(order);
await _inventoryService.ReserveItemsAsync(order);
}
catch (InventoryException ex)
{
// Not enough inventory - handle gracefully
await _notificationService.NotifyCustomerAsync(
order.CustomerEmail,
"Some items are out of stock");
// Continue with available items
order.Items = order.Items.Where(i => i.IsAvailable).ToList();
}
try
{
await _paymentService.ProcessPaymentAsync(order);
}
catch (PaymentException ex)
{
// Payment failed - retry later
throw;
}
return true;
}
Monitoring and Alerting
public class QueueMetrics
{
private readonly ILogger<QueueMetrics> _logger;
private readonly QueueClient _queueClient;
public async Task LogQueueMetricsAsync()
{
var properties = await _queueClient.GetPropertiesAsync();
_logger.LogInformation(
"Queue: {Queue}, Messages: {Count}, Created: {Created}",
_queueClient.Name,
properties.Value.ApproximateMessageCount,
properties.Value.CreatedOn);
}
}
// Health check for queue processing
public class QueueHealthCheck : IHealthCheck
{
private readonly QueueClient _queueClient;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken)
{
try
{
var properties = await _queueClient.GetPropertiesAsync(cancellationToken);
if (properties.Value.ApproximateMessageCount > 10000)
{
return HealthCheckResult.Degraded(
"Queue has large number of messages");
}
return HealthCheckResult.Healthy("Queue is healthy");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Queue is not accessible", ex);
}
}
}
Summary
| Practice | Description |
|---|---|
| Exponential Backoff | Increase delay between retries |
| Dead Letter Queue | Store failed messages for investigation |
| Poison Message Handling | Detect and archive repeatedly failing messages |
| Circuit Breaker | Stop processing when failures are too frequent |
| Idempotency | Ensure message can be processed multiple times safely |
| Monitoring | Track queue health and alert on issues |
Next Steps
- Multiple Queues - Managing multiple queues
- Azure Service Bus Topics - Pub/Sub patterns