Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Azure Service Bus Topics

Overview

Azure Service Bus Topics cung cấp Pub/Sub (publish-subscribe) messaging pattern, cho phép một message được gửi đến nhiều subscribers. Đây là lựa chọn tốt hơn Azure Queue khi bạn cần một-to-nhiều communication.

┌─────────────────────────────────────────────────────────────────┐
│                  SERVICE BUS TOPICS ARCHITECTURE               │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Publisher                                                     │
│       │                                                         │
│       ▼                                                         │
│   ┌───────────┐                                                 │
│   │   Topic   │  (OrdersTopic)                                  │
│   └─────┬─────┘                                                 │
│         │                                                       │
│    ┌────┼────┬────────────┐                                     │
│    │    │    │            │                                     │
│    ▼    ▼    ▼            ▼                                     │
│ ┌────┐┌────┐┌─────┐    ┌─────┐                                 │
│ │Sub1││Sub2││Sub3 │    │Sub4 │                                 │
│ │Email││SMS ││Webhook│    │Analytics│                            │
│ └────┘└────┘└─────┘    └─────┘                                 │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Setup

Installation

dotnet add package Azure.Messaging.ServiceBus

Configuration

// Connection string from Azure Portal
var connectionString = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey==";

Creating Topics and Subscriptions

public class ServiceBusSetup
{
    private readonly ServiceBusClient _client;
    
    public ServiceBusSetup(string connectionString)
    {
        _client = new ServiceBusClient(connectionString);
    }
    
    public async Task CreateTopicAndSubscriptionsAsync()
    {
        var adminClient = new ServiceBusAdministrationClient(
            "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey==");
        
        // Create topic
        var topicOptions = new CreateTopicOptions("orders-topic")
        {
            MaxSizeInMegabytes = 1024,
            DefaultMessageTimeToLive = TimeSpan.FromDays(7),
            EnableBatchedOperations = true
        };
        
        await adminClient.CreateTopicAsync(topicOptions);
        
        // Create subscriptions
        await adminClient.CreateSubscriptionAsync(
            new CreateSubscriptionOptions("orders-topic", "email-notifications"));
        
        await adminClient.CreateSubscriptionAsync(
            new CreateSubscriptionOptions("orders-topic", "sms-notifications"));
        
        await adminClient.CreateSubscriptionAsync(
            new CreateSubscriptionOptions("orders-topic", "analytics"));
        
        // Add filters to subscriptions
        await adminClient.CreateRuleAsync(
            "orders-topic",
            "email-notifications",
            new CreateRuleOptions
            {
                Name = "high-priority-filter",
                Filter = new SqlRuleFilter("sys.Label = 'high-priority'")
            });
    }
}

Publishing Messages to Topic

public class OrderPublisher
{
    private readonly TopicClient _topicClient;
    
    public OrderPublisher(string connectionString)
    {
        _topicClient = new TopicClient(connectionString, "orders-topic");
    }
    
    public async Task PublishOrderAsync(Order order)
    {
        var message = new ServiceBusMessage
        {
            Body = BinaryData.FromObjectAsJson(order),
            ContentType = "application/json",
            Subject = "OrderCreated",
            CorrelationId = order.Id.ToString(),
            MessageId = Guid.NewGuid().ToString(),
            TimeToLive = TimeSpan.FromDays(7)
        };
        
        // Add custom properties
        message.Properties["CustomerEmail"] = order.CustomerEmail;
        message.Properties["OrderTotal"] = order.TotalAmount.ToString();
        message.Properties["Priority"] = order.TotalAmount > 1000 ? "high-priority" : "normal";
        
        await _topicClient.SendMessageAsync(message);
        
        Console.WriteLine($"Order {order.Id} published to topic");
    }
    
    public async Task PublishBatchAsync(IEnumerable<Order> orders)
    {
        var messages = orders.Select(order => new ServiceBusMessage
        {
            Body = BinaryData.FromObjectAsJson(order),
            MessageId = Guid.NewGuid().ToString()
        });
        
        await _topicClient.SendMessagesAsync(messages);
    }
}

Subscribing to Messages

Basic Subscription Processor

public class EmailNotificationProcessor : BackgroundService
{
    private readonly ServiceBusProcessor _processor;
    private readonly ILogger<EmailNotificationProcessor> _logger;
    
    public EmailNotificationProcessor(
        string connectionString,
        ILogger<EmailNotificationProcessor> logger)
    {
        _logger = logger;
        
        var options = new ServiceBusProcessorOptions
        {
            MaxConcurrentCalls = 10,
            AutoCompleteMessages = false
        };
        
        _processor = new ServiceBusClient(connectionString)
            .CreateProcessor("orders-topic", "email-notifications", options);
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessMessageAsync += ProcessMessageHandler;
        _processor.ProcessErrorAsync += ErrorHandler;
        
        await _processor.StartProcessingAsync(stoppingToken);
        
        while (!stoppingToken.IsCancellationRequested)
        {
            await Task.Delay(1000, stoppingToken);
        }
    }
    
    private async Task ProcessMessageHandler(ProcessMessageEventArgs args)
    {
        var order = JsonSerializer.Deserialize<Order>(
            args.Message.Body.ToString());
        
        _logger.LogInformation("Sending email for order: {OrderId}", order.Id);
        
        // Send email logic
        await _emailService.SendOrderConfirmationAsync(order);
        
        // Complete the message
        await args.CompleteMessageAsync(args.Message);
    }
    
    private Task ErrorHandler(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception, "Error processing message");
        return Task.CompletedTask;
    }
    
    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        await _processor.StopProcessingAsync(cancellationToken);
        await base.StopAsync(cancellationToken);
    }
}

Multiple Subscription Processors

public class SmsNotificationProcessor : BackgroundService
{
    private readonly ServiceBusProcessor _processor;
    
    public SmsNotificationProcessor(string connectionString)
    {
        _processor = new ServiceBusClient(connectionString)
            .CreateProcessor("orders-topic", "sms-notifications");
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessMessageAsync += async args =>
        {
            var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
            
            // Send SMS
            await _smsService.SendAsync(
                order.CustomerPhone,
                $"Order confirmed: {order.Id}");
            
            await args.CompleteMessageAsync(args.Message);
        };
        
        await _processor.StartProcessingAsync(stoppingToken);
    }
}

public class AnalyticsProcessor : BackgroundService
{
    private readonly ServiceBusProcessor _processor;
    
    public AnalyticsProcessor(string connectionString)
    {
        _processor = new ServiceBusClient(connectionString)
            .CreateProcessor("orders-topic", "analytics");
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessMessageAsync += async args =>
        {
            var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
            
            // Record analytics
            await _analyticsService.RecordOrderAsync(order);
            
            await args.CompleteMessageAsync(args.Message);
        };
        
        await _processor.StartProcessingAsync(stoppingToken);
    }
}

Filters and Actions

SQL Filter

// Filter by message properties
var filter = new SqlRuleFilter("sys.Label = 'high-priority' AND order.TotalAmount > 1000");

// Composite filter
var compositeFilter = new SqlRuleFilter(
    "priority = 'high' OR (priority = 'medium' AND region = 'US')");

// Correlation filter (more efficient than SQL)
var correlationFilter = new CorrelationRuleFilter
{
    CorrelationId = order.Id.ToString(),
    Label = "high-priority"
};

Actions

// Update message properties when matching
var ruleOptions = new CreateRuleOptions
{
    Name = "add-priority-action",
    Filter = new SqlRuleFilter("sys.Label = 'urgent'"),
    Action = new SqlRuleAction("SET sys.Label = 'high-priority'; SET sys.TimeToLive = '3600'")
};

Example: Different Subscription for Priority

public async Task SetupPrioritySubscriptionsAsync()
{
    var adminClient = new ServiceBusAdministrationClient(connectionString);
    
    // Normal priority subscription - gets all messages
    await adminClient.CreateSubscriptionAsync(
        new CreateSubscriptionOptions("orders-topic", "all-orders"));
    
    // High priority subscription - gets only high value orders
    await adminClient.CreateRuleAsync(
        "orders-topic",
        "high-value-orders",
        new CreateRuleOptions
        {
            Name = "high-value-filter",
            Filter = new SqlRuleFilter("order.TotalAmount >= 1000")
        });
    
    // Low value subscription - gets only small orders
    await adminClient.CreateRuleAsync(
        "orders-topic",
        "low-value-orders",
        new CreateRuleOptions
        {
            Name = "low-value-filter",
            Filter = new SqlRuleFilter("order.TotalAmount < 100")
        });
}

Dead Letter and Retry

// Configure dead letter for subscription
public async Task ConfigureDeadLetterAsync()
{
    var adminClient = new ServiceBusAdministrationClient(connectionString);
    
    var subscriptionOptions = new CreateSubscriptionOptions(
        "orders-topic", 
        "email-notifications")
    {
        DeadLetterTopic = "orders-topic-deadletter",
        MaxDeliveryCount = 3,
        LockDuration = TimeSpan.FromMinutes(1)
    };
    
    await adminClient.CreateSubscriptionAsync(subscriptionOptions);
}

// Handle dead letter messages
public class DeadLetterProcessor : BackgroundService
{
    private readonly ServiceBusProcessor _processor;
    
    public DeadLetterProcessor(string connectionString)
    {
        _processor = new ServiceBusClient(connectionString)
            .CreateProcessor("orders-topic-deadletter", "all-messages");
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessMessageAsync += async args =>
        {
            var message = args.Message;
            
            // Log or analyze dead letter message
            Console.WriteLine($"Dead letter: {message.Body}");
            Console.WriteLine($"Error: {message.Properties}");
            
            await args.CompleteMessageAsync(args.Message);
        };
        
        await _processor.StartProcessingAsync(stoppingToken);
    }
}

Session Handling

// Enable sessions for ordered processing
public async Task SetupSessionAsync()
{
    var adminClient = new ServiceBusAdministrationClient(connectionString);
    
    var subscriptionOptions = new CreateSubscriptionOptions(
        "orders-topic",
        "ordered-processing")
    {
        RequiresSession = true  // Enable sessions
    };
    
    await adminClient.CreateSubscriptionAsync(subscriptionOptions);
}

// Process with session
public class SessionProcessor : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var options = new ServiceBusSessionProcessorOptions
        {
            MaxConcurrentSessions = 10,
            SessionIdleTimeout = TimeSpan.FromMinutes(1)
        };
        
        var processor = new ServiceBusClient(connectionString)
            .CreateSessionProcessor("orders-topic", "ordered-processing", options);
        
        processor.ProcessMessageAsync += async args =>
        {
            var sessionId = args.Message.SessionId;
            
            // All messages with same sessionId processed sequentially
            var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
            
            await ProcessOrderInSequenceAsync(order);
            
            await args.CompleteMessageAsync(args.Message);
        };
        
        await processor.StartProcessingAsync(stoppingToken);
    }
}

Comparison: Queue vs Topics

FeatureAzure QueueService Bus Topics
PatternPoint-to-PointPub/Sub
Multiple ConsumersOne consumer per messageMultiple consumers
Message Size64 KB256 KB
SessionsNot supportedSupported
TransactionsNot supportedSupported
Duplicate DetectionNot supportedSupported
OrderingFIFOFIFO per subscription

When to Use Topics

ScenarioSolution
Multiple systems need same dataPub/Sub
Different processing per message typeFilters
Fan-out to microservicesMultiple subscriptions
Event-driven architectureTopics + Subscriptions

Summary

ComponentDescription
TopicChannel for publishing messages
SubscriptionRecipient that receives messages from topic
FilterRules to route messages to subscriptions
ActionModify message when filter matches

Next Steps

References