Azure Service Bus Topics cung cấp Pub/Sub (publish-subscribe) messaging pattern, cho phép một message được gửi đến nhiều subscribers. Đây là lựa chọn tốt hơn Azure Queue khi bạn cần một-to-nhiều communication.
┌─────────────────────────────────────────────────────────────────┐
│ SERVICE BUS TOPICS ARCHITECTURE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Publisher │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ Topic │ (OrdersTopic) │
│ └─────┬─────┘ │
│ │ │
│ ┌────┼────┬────────────┐ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌────┐┌────┐┌─────┐ ┌─────┐ │
│ │Sub1││Sub2││Sub3 │ │Sub4 │ │
│ │Email││SMS ││Webhook│ │Analytics│ │
│ └────┘└────┘└─────┘ └─────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
dotnet add package Azure.Messaging.ServiceBus
// Connection string from Azure Portal
var connectionString = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey==";
public class ServiceBusSetup
{
private readonly ServiceBusClient _client;
public ServiceBusSetup(string connectionString)
{
_client = new ServiceBusClient(connectionString);
}
public async Task CreateTopicAndSubscriptionsAsync()
{
var adminClient = new ServiceBusAdministrationClient(
"Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey==");
// Create topic
var topicOptions = new CreateTopicOptions("orders-topic")
{
MaxSizeInMegabytes = 1024,
DefaultMessageTimeToLive = TimeSpan.FromDays(7),
EnableBatchedOperations = true
};
await adminClient.CreateTopicAsync(topicOptions);
// Create subscriptions
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions("orders-topic", "email-notifications"));
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions("orders-topic", "sms-notifications"));
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions("orders-topic", "analytics"));
// Add filters to subscriptions
await adminClient.CreateRuleAsync(
"orders-topic",
"email-notifications",
new CreateRuleOptions
{
Name = "high-priority-filter",
Filter = new SqlRuleFilter("sys.Label = 'high-priority'")
});
}
}
public class OrderPublisher
{
private readonly TopicClient _topicClient;
public OrderPublisher(string connectionString)
{
_topicClient = new TopicClient(connectionString, "orders-topic");
}
public async Task PublishOrderAsync(Order order)
{
var message = new ServiceBusMessage
{
Body = BinaryData.FromObjectAsJson(order),
ContentType = "application/json",
Subject = "OrderCreated",
CorrelationId = order.Id.ToString(),
MessageId = Guid.NewGuid().ToString(),
TimeToLive = TimeSpan.FromDays(7)
};
// Add custom properties
message.Properties["CustomerEmail"] = order.CustomerEmail;
message.Properties["OrderTotal"] = order.TotalAmount.ToString();
message.Properties["Priority"] = order.TotalAmount > 1000 ? "high-priority" : "normal";
await _topicClient.SendMessageAsync(message);
Console.WriteLine($"Order {order.Id} published to topic");
}
public async Task PublishBatchAsync(IEnumerable<Order> orders)
{
var messages = orders.Select(order => new ServiceBusMessage
{
Body = BinaryData.FromObjectAsJson(order),
MessageId = Guid.NewGuid().ToString()
});
await _topicClient.SendMessagesAsync(messages);
}
}
public class EmailNotificationProcessor : BackgroundService
{
private readonly ServiceBusProcessor _processor;
private readonly ILogger<EmailNotificationProcessor> _logger;
public EmailNotificationProcessor(
string connectionString,
ILogger<EmailNotificationProcessor> logger)
{
_logger = logger;
var options = new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 10,
AutoCompleteMessages = false
};
_processor = new ServiceBusClient(connectionString)
.CreateProcessor("orders-topic", "email-notifications", options);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += ProcessMessageHandler;
_processor.ProcessErrorAsync += ErrorHandler;
await _processor.StartProcessingAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
}
private async Task ProcessMessageHandler(ProcessMessageEventArgs args)
{
var order = JsonSerializer.Deserialize<Order>(
args.Message.Body.ToString());
_logger.LogInformation("Sending email for order: {OrderId}", order.Id);
// Send email logic
await _emailService.SendOrderConfirmationAsync(order);
// Complete the message
await args.CompleteMessageAsync(args.Message);
}
private Task ErrorHandler(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception, "Error processing message");
return Task.CompletedTask;
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
await _processor.StopProcessingAsync(cancellationToken);
await base.StopAsync(cancellationToken);
}
}
public class SmsNotificationProcessor : BackgroundService
{
private readonly ServiceBusProcessor _processor;
public SmsNotificationProcessor(string connectionString)
{
_processor = new ServiceBusClient(connectionString)
.CreateProcessor("orders-topic", "sms-notifications");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += async args =>
{
var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
// Send SMS
await _smsService.SendAsync(
order.CustomerPhone,
$"Order confirmed: {order.Id}");
await args.CompleteMessageAsync(args.Message);
};
await _processor.StartProcessingAsync(stoppingToken);
}
}
public class AnalyticsProcessor : BackgroundService
{
private readonly ServiceBusProcessor _processor;
public AnalyticsProcessor(string connectionString)
{
_processor = new ServiceBusClient(connectionString)
.CreateProcessor("orders-topic", "analytics");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += async args =>
{
var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
// Record analytics
await _analyticsService.RecordOrderAsync(order);
await args.CompleteMessageAsync(args.Message);
};
await _processor.StartProcessingAsync(stoppingToken);
}
}
// Filter by message properties
var filter = new SqlRuleFilter("sys.Label = 'high-priority' AND order.TotalAmount > 1000");
// Composite filter
var compositeFilter = new SqlRuleFilter(
"priority = 'high' OR (priority = 'medium' AND region = 'US')");
// Correlation filter (more efficient than SQL)
var correlationFilter = new CorrelationRuleFilter
{
CorrelationId = order.Id.ToString(),
Label = "high-priority"
};
// Update message properties when matching
var ruleOptions = new CreateRuleOptions
{
Name = "add-priority-action",
Filter = new SqlRuleFilter("sys.Label = 'urgent'"),
Action = new SqlRuleAction("SET sys.Label = 'high-priority'; SET sys.TimeToLive = '3600'")
};
public async Task SetupPrioritySubscriptionsAsync()
{
var adminClient = new ServiceBusAdministrationClient(connectionString);
// Normal priority subscription - gets all messages
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions("orders-topic", "all-orders"));
// High priority subscription - gets only high value orders
await adminClient.CreateRuleAsync(
"orders-topic",
"high-value-orders",
new CreateRuleOptions
{
Name = "high-value-filter",
Filter = new SqlRuleFilter("order.TotalAmount >= 1000")
});
// Low value subscription - gets only small orders
await adminClient.CreateRuleAsync(
"orders-topic",
"low-value-orders",
new CreateRuleOptions
{
Name = "low-value-filter",
Filter = new SqlRuleFilter("order.TotalAmount < 100")
});
}
// Configure dead letter for subscription
public async Task ConfigureDeadLetterAsync()
{
var adminClient = new ServiceBusAdministrationClient(connectionString);
var subscriptionOptions = new CreateSubscriptionOptions(
"orders-topic",
"email-notifications")
{
DeadLetterTopic = "orders-topic-deadletter",
MaxDeliveryCount = 3,
LockDuration = TimeSpan.FromMinutes(1)
};
await adminClient.CreateSubscriptionAsync(subscriptionOptions);
}
// Handle dead letter messages
public class DeadLetterProcessor : BackgroundService
{
private readonly ServiceBusProcessor _processor;
public DeadLetterProcessor(string connectionString)
{
_processor = new ServiceBusClient(connectionString)
.CreateProcessor("orders-topic-deadletter", "all-messages");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += async args =>
{
var message = args.Message;
// Log or analyze dead letter message
Console.WriteLine($"Dead letter: {message.Body}");
Console.WriteLine($"Error: {message.Properties}");
await args.CompleteMessageAsync(args.Message);
};
await _processor.StartProcessingAsync(stoppingToken);
}
}
// Enable sessions for ordered processing
public async Task SetupSessionAsync()
{
var adminClient = new ServiceBusAdministrationClient(connectionString);
var subscriptionOptions = new CreateSubscriptionOptions(
"orders-topic",
"ordered-processing")
{
RequiresSession = true // Enable sessions
};
await adminClient.CreateSubscriptionAsync(subscriptionOptions);
}
// Process with session
public class SessionProcessor : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var options = new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 10,
SessionIdleTimeout = TimeSpan.FromMinutes(1)
};
var processor = new ServiceBusClient(connectionString)
.CreateSessionProcessor("orders-topic", "ordered-processing", options);
processor.ProcessMessageAsync += async args =>
{
var sessionId = args.Message.SessionId;
// All messages with same sessionId processed sequentially
var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
await ProcessOrderInSequenceAsync(order);
await args.CompleteMessageAsync(args.Message);
};
await processor.StartProcessingAsync(stoppingToken);
}
}
Feature Azure Queue Service Bus Topics
Pattern Point-to-Point Pub/Sub
Multiple Consumers One consumer per message Multiple consumers
Message Size 64 KB 256 KB
Sessions Not supported Supported
Transactions Not supported Supported
Duplicate Detection Not supported Supported
Ordering FIFO FIFO per subscription
Scenario Solution
Multiple systems need same data Pub/Sub
Different processing per message type Filters
Fan-out to microservices Multiple subscriptions
Event-driven architecture Topics + Subscriptions
Component Description
Topic Channel for publishing messages
Subscription Recipient that receives messages from topic
Filter Rules to route messages to subscriptions
Action Modify message when filter matches