Azure Queue Storage
Overview
Azure Queue Storage là một dịch vụ message queue được quản lý hoàn toàn bởi Microsoft Azure, cung cấp reliable, persistent message storage với chi phí thấp.
┌─────────────────────────────────────────────────────────────────┐
│ AZURE QUEUE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Web App │ │ Backend │ │
│ │ (Producer) │────────────▶│ Worker │ │
│ └─────────────┘ │ (Consumer) │ │
│ │ └─────────────┘ │
│ │ │ │
│ │ ┌───────────────────────┼───────────────────────┐ │
│ │ │ AZURE STORAGE ACCOUNT │ │
│ │ │ ┌────────────┐ ┌────────────┐ ┌──────────┐ │ │
│ │ │ │ QUEUE │ │ BLOB │ │ TABLE │ │ │
│ │ │ │ (Messages) │ │ (Files) │ │ (Data) │ │ │
│ │ │ └────────────┘ └────────────┘ └──────────┘ │ │
│ │ └───────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Azure SDK │
└─────────────────────────────────────────────────────────────────┘
Setup
Installation
dotnet add package Azure.Storage.Queues
Configuration
// Connection string from Azure Portal
var connectionString = "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=your-key==;EndpointSuffix=core.windows.net";
// Create client
var queueClient = new QueueClient(connectionString, "orders-queue");
// Create queue if not exists
await queueClient.CreateIfNotExistsAsync();
Dependency Injection Setup
public class Program
{
public static void Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);
// Register QueueClient
builder.Services.AddSingleton(sp =>
{
var connectionString = builder.Configuration.GetValue<string>("AzureStorage");
var client = new QueueClient(connectionString, "orders-queue");
client.CreateIfNotExists();
return client;
});
// Or use IOptions pattern for configuration
builder.Services.Configure<QueueOptions>(
builder.Configuration.GetSection("Queue"));
var host = builder.Build();
host.Run();
}
}
public class QueueOptions
{
public string ConnectionString { get; set; }
public string OrdersQueue { get; set; } = "orders-queue";
public string NotificationsQueue { get; set; } = "notifications-queue";
}
Sending Messages
Basic Send
public class OrderQueueService
{
private readonly QueueClient _queueClient;
public OrderQueueService(QueueClient queueClient)
{
_queueClient = queueClient;
}
public async Task SendOrderAsync(Order order)
{
var message = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(order),
MessageId = Guid.NewGuid().ToString(),
VisibilityTimeout = TimeSpan.Zero,
TimeToLive = TimeSpan.FromDays(7)
};
await _queueClient.SendMessageAsync(message);
Console.WriteLine($"Order {order.Id} sent to queue");
}
}
// Order model
public class Order
{
public Guid Id { get; set; }
public string CustomerEmail { get; set; }
public List<OrderItem> Items { get; set; }
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; }
}
public class OrderItem
{
public string ProductName { get; set; }
public int Quantity { get; set; }
public decimal Price { get; set; }
}
Send with Metadata
public async Task SendOrderWithMetadataAsync(Order order)
{
var message = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(order),
TimeToLive = TimeSpan.FromDays(7),
Metadata = new Dictionary<string, string>
{
["OrderId"] = order.Id.ToString(),
["CustomerEmail"] = order.CustomerEmail,
["Priority"] = order.TotalAmount > 1000 ? "High" : "Normal"
}
};
await _queueClient.SendMessageAsync(message);
}
Batch Send
public async Task SendOrdersBatchAsync(IEnumerable<Order> orders)
{
var messages = orders.Select(order => new QueueMessage
{
Body = BinaryData.FromObjectAsJson(order),
MessageId = Guid.NewGuid().ToString()
});
// Azure Queue supports batch sending
foreach (var message in messages)
{
await _queueClient.SendMessageAsync(message);
}
}
Receiving Messages
Basic Receive
public class OrderProcessor : BackgroundService
{
private readonly QueueClient _queueClient;
private readonly ILogger<OrderProcessor> _logger;
public OrderProcessor(QueueClient queueClient, ILogger<OrderProcessor> logger)
{
_queueClient = queueClient;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Receive up to 32 messages
var messages = await _queueClient.ReceiveMessagesAsync(
maxMessages: 32,
visibilityTimeout: TimeSpan.FromMinutes(5),
cancellationToken: stoppingToken);
foreach (var message in messages.Value)
{
try
{
var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
_logger.LogInformation("Processing order: {OrderId}", order.Id);
await ProcessOrderAsync(order);
// Delete message after successful processing
await _queueClient.DeleteMessageAsync(
message.MessageId,
message.PopReceipt,
stoppingToken);
_logger.LogInformation("Order processed: {OrderId}", order.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message: {MessageId}",
message.MessageId);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error receiving messages");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
}
private async Task ProcessOrderAsync(Order order)
{
await Task.Delay(100); // Simulate processing
}
}
Peek Messages
// Peek messages without removing them from queue
public async Task PeekMessagesAsync()
{
var peekedMessages = await _queueClient.PeekMessagesAsync(maxMessages: 10);
foreach (var message in peekedMessages.Value)
{
Console.WriteLine($"Message ID: {message.MessageId}");
Console.WriteLine($"Body: {message.Body}");
}
}
Update Message Visibility
// Extend visibility timeout if processing takes longer
public async Task ExtendProcessingTimeAsync(QueueMessage message)
{
await _queueClient.UpdateMessageAsync(
message.MessageId,
message.PopReceipt,
visibilityTimeout: TimeSpan.FromMinutes(10));
}
Queue Properties
// Get queue properties
public async Task GetQueueInfoAsync()
{
var properties = await _queueClient.GetPropertiesAsync();
Console.WriteLine($"Approximate Message Count: {properties.Value.ApproximateMessageCount}");
Console.WriteLine($"Created On: {properties.Value.CreatedOn}");
Console.WriteLine($"Last Modified: {properties.Value.LastModified}");
}
Best Practices
1. Always Use CancellationToken
public async Task SendWithCancellationAsync(Order order, CancellationToken ct)
{
var message = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(order)
};
await _queueClient.SendMessageAsync(message, cancellationToken: ct);
}
2. Handle Message Overflow
// Messages larger than 64KB should use Azure Blob
public async Task SendLargeOrderAsync(Order order)
{
// Store large data in Blob
var blobClient = new BlobClient(
"DefaultEndpointsProtocol=https;AccountName=...;AccountKey==",
"order-payloads",
$"{order.Id}.json");
await blobClient.UploadAsync(BinaryData.FromObjectAsJson(order));
// Send reference in queue
var message = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(new OrderReference
{
OrderId = order.Id,
PayloadUri = blobClient.Uri.ToString()
})
};
await _queueClient.SendMessageAsync(message);
}
3. Configure Appropriate TTL
// Set appropriate TTL based on message urgency
var urgentMessage = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(order),
TimeToLive = TimeSpan.FromMinutes(30) // Short TTL for urgent
};
var normalMessage = new QueueMessage
{
Body = BinaryData.FromObjectAsJson(order),
TimeToLive = TimeSpan.FromDays(7) // Long TTL for normal
};
Summary
| Feature | Limit |
|---|---|
| Max message size | 64 KB |
| Max queue size | 500 TB |
| Max messages per retrieval | 32 |
| Default visibility timeout | 30 seconds |
| Max visibility timeout | 7 days |
| Max TTL | 7 days |
Next Steps
- Queue Best Practices - Error handling, retry
- Multiple Queues - Managing multiple queues
- Introduction - Back to overview