Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Azure Queue Storage

Overview

Azure Queue Storage là một dịch vụ message queue được quản lý hoàn toàn bởi Microsoft Azure, cung cấp reliable, persistent message storage với chi phí thấp.

┌─────────────────────────────────────────────────────────────────┐
│                  AZURE QUEUE ARCHITECTURE                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────────┐              ┌─────────────┐                  │
│  │   Web App   │              │   Backend   │                  │
│  │  (Producer) │────────────▶│   Worker    │                  │
│  └─────────────┘              │  (Consumer) │                  │
│        │                      └─────────────┘                  │
│        │                             │                          │
│        │    ┌───────────────────────┼───────────────────────┐ │
│        │    │         AZURE STORAGE ACCOUNT                 │ │
│        │    │  ┌────────────┐  ┌────────────┐  ┌──────────┐  │ │
│        │    │  │    QUEUE   │  │    BLOB    │  │   TABLE │  │ │
│        │    │  │ (Messages) │  │  (Files)  │  │  (Data) │  │ │
│        │    │  └────────────┘  └────────────┘  └──────────┘  │ │
│        │    └───────────────────────────────────────────────┘ │
│        │                                                       │
│        ▼                                                       │
│   Azure SDK                                                     │
└─────────────────────────────────────────────────────────────────┘

Setup

Installation

dotnet add package Azure.Storage.Queues

Configuration

// Connection string from Azure Portal
var connectionString = "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=your-key==;EndpointSuffix=core.windows.net";

// Create client
var queueClient = new QueueClient(connectionString, "orders-queue");

// Create queue if not exists
await queueClient.CreateIfNotExistsAsync();

Dependency Injection Setup

public class Program
{
    public static void Main(string[] args)
    {
        var builder = Host.CreateApplicationBuilder(args);
        
        // Register QueueClient
        builder.Services.AddSingleton(sp =>
        {
            var connectionString = builder.Configuration.GetValue<string>("AzureStorage");
            var client = new QueueClient(connectionString, "orders-queue");
            client.CreateIfNotExists();
            return client;
        });
        
        // Or use IOptions pattern for configuration
        builder.Services.Configure<QueueOptions>(
            builder.Configuration.GetSection("Queue"));
        
        var host = builder.Build();
        host.Run();
    }
}

public class QueueOptions
{
    public string ConnectionString { get; set; }
    public string OrdersQueue { get; set; } = "orders-queue";
    public string NotificationsQueue { get; set; } = "notifications-queue";
}

Sending Messages

Basic Send

public class OrderQueueService
{
    private readonly QueueClient _queueClient;
    
    public OrderQueueService(QueueClient queueClient)
    {
        _queueClient = queueClient;
    }
    
    public async Task SendOrderAsync(Order order)
    {
        var message = new QueueMessage
        {
            Body = BinaryData.FromObjectAsJson(order),
            MessageId = Guid.NewGuid().ToString(),
            VisibilityTimeout = TimeSpan.Zero,
            TimeToLive = TimeSpan.FromDays(7)
        };
        
        await _queueClient.SendMessageAsync(message);
        
        Console.WriteLine($"Order {order.Id} sent to queue");
    }
}

// Order model
public class Order
{
    public Guid Id { get; set; }
    public string CustomerEmail { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime CreatedAt { get; set; }
}

public class OrderItem
{
    public string ProductName { get; set; }
    public int Quantity { get; set; }
    public decimal Price { get; set; }
}

Send with Metadata

public async Task SendOrderWithMetadataAsync(Order order)
{
    var message = new QueueMessage
    {
        Body = BinaryData.FromObjectAsJson(order),
        TimeToLive = TimeSpan.FromDays(7),
        Metadata = new Dictionary<string, string>
        {
            ["OrderId"] = order.Id.ToString(),
            ["CustomerEmail"] = order.CustomerEmail,
            ["Priority"] = order.TotalAmount > 1000 ? "High" : "Normal"
        }
    };
    
    await _queueClient.SendMessageAsync(message);
}

Batch Send

public async Task SendOrdersBatchAsync(IEnumerable<Order> orders)
{
    var messages = orders.Select(order => new QueueMessage
    {
        Body = BinaryData.FromObjectAsJson(order),
        MessageId = Guid.NewGuid().ToString()
    });
    
    // Azure Queue supports batch sending
    foreach (var message in messages)
    {
        await _queueClient.SendMessageAsync(message);
    }
}

Receiving Messages

Basic Receive

public class OrderProcessor : BackgroundService
{
    private readonly QueueClient _queueClient;
    private readonly ILogger<OrderProcessor> _logger;
    
    public OrderProcessor(QueueClient queueClient, ILogger<OrderProcessor> logger)
    {
        _queueClient = queueClient;
        _logger = logger;
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // Receive up to 32 messages
                var messages = await _queueClient.ReceiveMessagesAsync(
                    maxMessages: 32,
                    visibilityTimeout: TimeSpan.FromMinutes(5),
                    cancellationToken: stoppingToken);
                
                foreach (var message in messages.Value)
                {
                    try
                    {
                        var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
                        
                        _logger.LogInformation("Processing order: {OrderId}", order.Id);
                        
                        await ProcessOrderAsync(order);
                        
                        // Delete message after successful processing
                        await _queueClient.DeleteMessageAsync(
                            message.MessageId,
                            message.PopReceipt,
                            stoppingToken);
                        
                        _logger.LogInformation("Order processed: {OrderId}", order.Id);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Error processing message: {MessageId}", 
                            message.MessageId);
                    }
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error receiving messages");
                await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
            }
        }
    }
    
    private async Task ProcessOrderAsync(Order order)
    {
        await Task.Delay(100); // Simulate processing
    }
}

Peek Messages

// Peek messages without removing them from queue
public async Task PeekMessagesAsync()
{
    var peekedMessages = await _queueClient.PeekMessagesAsync(maxMessages: 10);
    
    foreach (var message in peekedMessages.Value)
    {
        Console.WriteLine($"Message ID: {message.MessageId}");
        Console.WriteLine($"Body: {message.Body}");
    }
}

Update Message Visibility

// Extend visibility timeout if processing takes longer
public async Task ExtendProcessingTimeAsync(QueueMessage message)
{
    await _queueClient.UpdateMessageAsync(
        message.MessageId,
        message.PopReceipt,
        visibilityTimeout: TimeSpan.FromMinutes(10));
}

Queue Properties

// Get queue properties
public async Task GetQueueInfoAsync()
{
    var properties = await _queueClient.GetPropertiesAsync();
    
    Console.WriteLine($"Approximate Message Count: {properties.Value.ApproximateMessageCount}");
    Console.WriteLine($"Created On: {properties.Value.CreatedOn}");
    Console.WriteLine($"Last Modified: {properties.Value.LastModified}");
}

Best Practices

1. Always Use CancellationToken

public async Task SendWithCancellationAsync(Order order, CancellationToken ct)
{
    var message = new QueueMessage
    {
        Body = BinaryData.FromObjectAsJson(order)
    };
    
    await _queueClient.SendMessageAsync(message, cancellationToken: ct);
}

2. Handle Message Overflow

// Messages larger than 64KB should use Azure Blob
public async Task SendLargeOrderAsync(Order order)
{
    // Store large data in Blob
    var blobClient = new BlobClient(
        "DefaultEndpointsProtocol=https;AccountName=...;AccountKey==",
        "order-payloads",
        $"{order.Id}.json");
    
    await blobClient.UploadAsync(BinaryData.FromObjectAsJson(order));
    
    // Send reference in queue
    var message = new QueueMessage
    {
        Body = BinaryData.FromObjectAsJson(new OrderReference
        {
            OrderId = order.Id,
            PayloadUri = blobClient.Uri.ToString()
        })
    };
    
    await _queueClient.SendMessageAsync(message);
}

3. Configure Appropriate TTL

// Set appropriate TTL based on message urgency
var urgentMessage = new QueueMessage
{
    Body = BinaryData.FromObjectAsJson(order),
    TimeToLive = TimeSpan.FromMinutes(30)  // Short TTL for urgent
};

var normalMessage = new QueueMessage
{
    Body = BinaryData.FromObjectAsJson(order),
    TimeToLive = TimeSpan.FromDays(7)  // Long TTL for normal
};

Summary

FeatureLimit
Max message size64 KB
Max queue size500 TB
Max messages per retrieval32
Default visibility timeout30 seconds
Max visibility timeout7 days
Max TTL7 days

Next Steps

References