Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Event-Driven Architecture

Overview

Event-Driven Architecture (EDA) là một architectural style trong đó các components giao tiếp với nhau bằng cách gửi và nhận events. Thay vì components gọi trực tiếp nhau, chúng tạo và phản hồi events, cho phép loose coupling và async processing.

Core Concepts

┌─────────────────────────────────────────────────────────────────┐
│                     Event-Driven System                         │
│                                                                  │
│   ┌──────────┐      ┌──────────┐      ┌──────────┐            │
│   │Producer A│      │Producer B│      │Producer C│            │
│   └────┬─────┘      └────┬─────┘      └────┬─────┘            │
│        │                 │                 │                  │
│        └─────────────────┼─────────────────┘                  │
│                          ▼                                      │
│                   ┌──────────┐                                  │
│                   │ Event    │                                  │
│                   │ Bus/     │                                  │
│                   │ Broker   │                                  │
│                   └────┬─────┘                                  │
│        ┌────────────────┼────────────────┐                    │
│        ▼                ▼                ▼                    │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐               │
│   │Consumer 1│    │Consumer 2│    │Consumer 3│               │
│   └──────────┘    └──────────┘    └──────────┘               │
└─────────────────────────────────────────────────────────────────┘

Components

  1. Event Producer: Tạo và publish events
  2. Event Consumer: Lắng nghe và xử lý events
  3. Event Channel/Broker: Truyền events từ producers đến consumers
  4. Event Router: Định tuyến events đến đúng consumers

Types of Events

1. Simple Events

Một event đơn lẻ, không có batch hay state change history.

public class OrderPlacedEvent
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime PlacedAt { get; set; }
}

2. Event Carried State Transfer

Event chứa dữ liệu cần thiết, consumer không cần gọi API khác.

public class OrderPlacedEvent
{
    public Guid OrderId { get; set; }
    public CustomerInfo Customer { get; set; }  // Full details
    public List<OrderItemDto> Items { get; set; }  // All items
    public decimal TotalAmount { get; set; }
    public string ShippingAddress { get; set; }
}

3. Notification Events

Chỉ notify, consumer tự fetch thêm data nếu cần.

public class OrderPlacedNotification
{
    public Guid OrderId { get; set; }
    public DateTime Timestamp { get; set; }
    // Consumer calls GET /api/orders/{id} to get full details
}

Implementation in .NET

1. Using In-Memory Events (Simple)

// Event definitions
public interface IEvent
{
    DateTime OccurredAt { get; }
}

public class OrderPlacedEvent : IEvent
{
    public DateTime OccurredAt { get; } = DateTime.UtcNow;
    public Guid OrderId { get; }
    public string CustomerEmail { get; }
    
    public OrderPlacedEvent(Guid orderId, string customerEmail)
    {
        OrderId = orderId;
        CustomerEmail = customerEmail;
    }
}

// Event bus (simple in-memory)
public interface IEventBus
{
    void Publish<T>(T @event) where T : IEvent;
    void Subscribe<T>(Action<T> handler) where T : IEvent;
}

public class InMemoryEventBus : IEventBus
{
    private readonly Dictionary<Type, List<Delegate>> _handlers = new();
    
    public void Publish<T>(T @event) where T : IEvent
    {
        if (_handlers.TryGetValue(typeof(T), out var handlers))
        {
            foreach (var handler in handlers)
            {
                ((Action<T>)handler)(@event);
            }
        }
    }
    
    public void Subscribe<T>(Action<T> handler) where T : IEvent
    {
        if (!_handlers.ContainsKey(typeof(T)))
        {
            _handlers[typeof(T)] = new List<Delegate>();
        }
        _handlers[typeof(T)].Add(handler);
    }
}

2. Using Message Queue (Production)

// Using RabbitMQ
public class RabbitMqEventBus : IEventBus
{
    private readonly IConnection _connection;
    private readonly IModel _channel;
    
    public RabbitMqEventBus(IConnection connection)
    {
        _connection = connection;
        _channel = _connection.CreateModel();
    }
    
    public void Publish<T>(T @event) where T : IEvent
    {
        var exchange = "events";
        var routingKey = typeof(T).Name;
        
        _channel.ExchangeDeclare(exchange, ExchangeType.Fanout);
        
        var message = JsonSerializer.Serialize(@event);
        var body = Encoding.UTF8.GetBytes(message);
        
        var properties = _channel.CreateBasicProperties();
        properties.Persistent = true;
        
        _channel.BasicPublish(exchange, routingKey, properties, body);
    }
}

// Consumer
public class OrderEventConsumer : BackgroundService
{
    private readonly IModel _channel;
    
    public OrderEventConsumer(IModel channel)
    {
        _channel = channel;
    }
    
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _channel.ExchangeDeclare("events", ExchangeType.Fanout);
        var queue = _channel.QueueDeclare().QueueName;
        _channel.QueueBind(queue, "events", "");
        
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            var @event = JsonSerializer.Deserialize<OrderPlacedEvent>(message);
            
            ProcessOrderPlaced(@event);
        };
        
        _channel.BasicConsume(queue, autoAck: true, consumer: consumer);
        
        return Task.CompletedTask;
    }
}

3. Using Azure Service Bus

public class AzureServiceBusEventBus : IEventBus
{
    private readonly ServiceBusClient _client;
    private readonly ServiceBusSender _sender;
    
    public AzureServiceBusEventBus(string connectionString)
    {
        _client = new ServiceBusClient(connectionString);
        _sender = _client.CreateSender("events");
    }
    
    public async Task PublishAsync<T>(T @event) where T : IEvent
    {
        var message = new ServiceBusMessage
        {
            Body = new BinaryData(JsonSerializer.Serialize(@event)),
            ContentType = "application/json"
        };
        
        await _sender.SendMessageAsync(message);
    }
}

// Azure Function with Service Bus trigger
public class OrderFunctions
{
    [FunctionName("ProcessOrderPlaced")]
    public async Task Run(
        [ServiceBusTrigger("events", "OrderPlacedEvent")] 
        ServiceBusMessage message)
    {
        var orderEvent = JsonSerializer.Deserialize<OrderPlacedEvent>(
            message.Body.ToString());
            
        await _orderService.ProcessOrderAsync(orderEvent.OrderId);
    }
}

Event Patterns

1. Pub/Sub Pattern

// Multiple consumers can subscribe to same event
public class NotificationService
{
    public void Handle(OrderPlacedEvent evt)
    {
        // Send email notification
        _emailService.Send(evt.CustomerEmail, "Order placed!");
    }
}

public class InventoryService
{
    public void Handle(OrderPlacedEvent evt)
    {
        // Reserve inventory
        _inventoryService.ReserveItems(evt.OrderId);
    }
}

public class AnalyticsService
{
    public void Handle(OrderPlacedEvent evt)
    {
        // Track analytics
        _analytics.TrackOrder(evt.OrderId, evt.TotalAmount);
    }
}

2. Event Sourcing

// Events are stored as the source of truth
public class OrderService
{
    public async Task PlaceOrder(PlaceOrderCommand command)
    {
        var order = new Order();
        
        // Apply domain events
        foreach (var item in command.Items)
        {
            order.AddItem(item.ProductId, item.Quantity);
        }
        
        // Save to event store
        await _eventStore.AppendAsync(order.DomainEvents);
        
        // Publish events
        foreach (var evt in order.DomainEvents)
        {
            await _eventBus.PublishAsync(evt);
        }
    }
}

3. Choreography (Distributed)

// Services communicate directly via events
// No central orchestrator
public class OrderService
{
    public async Task PlaceOrder(Order order)
    {
        await _eventBus.PublishAsync(new OrderCreatedEvent(order));
    }
}

public class InventoryService
{
    public async Task Handle(OrderCreatedEvent evt)
    {
        // Reserve inventory
        await _inventoryService.ReserveAsync(evt.OrderId);
        
        // Publish next event
        await _eventBus.PublishAsync(new InventoryReservedEvent(evt.OrderId));
    }
}

public class PaymentService
{
    public async Task Handle(InventoryReservedEvent evt)
    {
        // Process payment
        await _paymentService.ProcessAsync(evt.OrderId);
        
        await _eventBus.PublishAsync(new PaymentProcessedEvent(evt.OrderId));
    }
}

4. Orchestration (Centralized)

// Orchestrator coordinates the flow
public class OrderOrchestrator
{
    public async Task PlaceOrder(Order order)
    {
        // Step 1: Validate order
        await _orderService.ValidateAsync(order);
        
        // Step 2: Reserve inventory
        await _inventoryService.ReserveAsync(order.Id);
        
        // Step 3: Process payment
        await _paymentService.ProcessAsync(order.Id);
        
        // Step 4: Confirm order
        await _orderService.ConfirmAsync(order.Id);
    }
}

Error Handling

1. Retry with Backoff

public async Task Handle(OrderPlacedEvent evt)
{
    var retryPolicy = Policy
        .Handle<Exception>()
        .WaitAndRetryAsync(3, retryAttempt => 
            TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
            
    await retryPolicy.ExecuteAsync(async () =>
    {
        await _inventoryService.ReserveAsync(evt.OrderId);
    });
}

2. Dead Letter Queue

public async Task Handle(OrderPlacedEvent evt)
{
    try
    {
        await _inventoryService.ReserveAsync(evt.OrderId);
    }
    catch (Exception ex)
    {
        // Send to dead letter queue
        await _deadLetterQueue.SendAsync(new FailedEvent
        {
            OriginalEvent = evt,
            Error = ex.Message,
            FailedAt = DateTime.UtcNow
        });
    }
}

3. Idempotency

public async Task Handle(OrderPlacedEvent evt)
{
    // Check if already processed
    var isProcessed = await _redis.SetAddAsync(
        $"processed:{evt.OrderId}",
        "OrderPlaced");
        
    if (!isProcessed)
    {
        return; // Already processed
    }
    
    await _inventoryService.ReserveAsync(evt.OrderId);
}

Benefits

BenefitDescription
Loose CouplingComponents don’t know about each other
ScalabilityScale consumers independently
FlexibilityEasy to add new consumers
ResilienceFailed events can be retried
Audit TrailEvents provide natural audit trail
Async ProcessingNon-blocking operations

Challenges

ChallengeDescription
ComplexityHarder to debug và trace
Eventual ConsistencyData may not be immediately consistent
DuplicationSame event processed by multiple consumers
OrderingEvents may arrive out of order
TestingHarder to test distributed systems

Use Cases

Use CaseDescription
MicroservicesService-to-service communication
Real-time ProcessingStream processing
NotificationsPush notifications
AuditAudit logging
WorkflowMulti-step processes
IoTDevice event processing

Best Practices

  1. Idempotency: Design consumers to handle duplicate events
  2. Event Size: Keep events small, reference data by ID
  3. Versioning: Plan for event schema changes
  4. Error Handling: Use retry policies và dead letter queues
  5. Monitoring: Track event processing metrics

References