Event-Driven Architecture
Overview
Event-Driven Architecture (EDA) là một architectural style trong đó các components giao tiếp với nhau bằng cách gửi và nhận events. Thay vì components gọi trực tiếp nhau, chúng tạo và phản hồi events, cho phép loose coupling và async processing.
Core Concepts
┌─────────────────────────────────────────────────────────────────┐
│ Event-Driven System │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Producer A│ │Producer B│ │Producer C│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ ▼ │
│ ┌──────────┐ │
│ │ Event │ │
│ │ Bus/ │ │
│ │ Broker │ │
│ └────┬─────┘ │
│ ┌────────────────┼────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Consumer 1│ │Consumer 2│ │Consumer 3│ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
Components
- Event Producer: Tạo và publish events
- Event Consumer: Lắng nghe và xử lý events
- Event Channel/Broker: Truyền events từ producers đến consumers
- Event Router: Định tuyến events đến đúng consumers
Types of Events
1. Simple Events
Một event đơn lẻ, không có batch hay state change history.
public class OrderPlacedEvent
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal TotalAmount { get; set; }
public DateTime PlacedAt { get; set; }
}
2. Event Carried State Transfer
Event chứa dữ liệu cần thiết, consumer không cần gọi API khác.
public class OrderPlacedEvent
{
public Guid OrderId { get; set; }
public CustomerInfo Customer { get; set; } // Full details
public List<OrderItemDto> Items { get; set; } // All items
public decimal TotalAmount { get; set; }
public string ShippingAddress { get; set; }
}
3. Notification Events
Chỉ notify, consumer tự fetch thêm data nếu cần.
public class OrderPlacedNotification
{
public Guid OrderId { get; set; }
public DateTime Timestamp { get; set; }
// Consumer calls GET /api/orders/{id} to get full details
}
Implementation in .NET
1. Using In-Memory Events (Simple)
// Event definitions
public interface IEvent
{
DateTime OccurredAt { get; }
}
public class OrderPlacedEvent : IEvent
{
public DateTime OccurredAt { get; } = DateTime.UtcNow;
public Guid OrderId { get; }
public string CustomerEmail { get; }
public OrderPlacedEvent(Guid orderId, string customerEmail)
{
OrderId = orderId;
CustomerEmail = customerEmail;
}
}
// Event bus (simple in-memory)
public interface IEventBus
{
void Publish<T>(T @event) where T : IEvent;
void Subscribe<T>(Action<T> handler) where T : IEvent;
}
public class InMemoryEventBus : IEventBus
{
private readonly Dictionary<Type, List<Delegate>> _handlers = new();
public void Publish<T>(T @event) where T : IEvent
{
if (_handlers.TryGetValue(typeof(T), out var handlers))
{
foreach (var handler in handlers)
{
((Action<T>)handler)(@event);
}
}
}
public void Subscribe<T>(Action<T> handler) where T : IEvent
{
if (!_handlers.ContainsKey(typeof(T)))
{
_handlers[typeof(T)] = new List<Delegate>();
}
_handlers[typeof(T)].Add(handler);
}
}
2. Using Message Queue (Production)
// Using RabbitMQ
public class RabbitMqEventBus : IEventBus
{
private readonly IConnection _connection;
private readonly IModel _channel;
public RabbitMqEventBus(IConnection connection)
{
_connection = connection;
_channel = _connection.CreateModel();
}
public void Publish<T>(T @event) where T : IEvent
{
var exchange = "events";
var routingKey = typeof(T).Name;
_channel.ExchangeDeclare(exchange, ExchangeType.Fanout);
var message = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(message);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
_channel.BasicPublish(exchange, routingKey, properties, body);
}
}
// Consumer
public class OrderEventConsumer : BackgroundService
{
private readonly IModel _channel;
public OrderEventConsumer(IModel channel)
{
_channel = channel;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_channel.ExchangeDeclare("events", ExchangeType.Fanout);
var queue = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue, "events", "");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var @event = JsonSerializer.Deserialize<OrderPlacedEvent>(message);
ProcessOrderPlaced(@event);
};
_channel.BasicConsume(queue, autoAck: true, consumer: consumer);
return Task.CompletedTask;
}
}
3. Using Azure Service Bus
public class AzureServiceBusEventBus : IEventBus
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _sender;
public AzureServiceBusEventBus(string connectionString)
{
_client = new ServiceBusClient(connectionString);
_sender = _client.CreateSender("events");
}
public async Task PublishAsync<T>(T @event) where T : IEvent
{
var message = new ServiceBusMessage
{
Body = new BinaryData(JsonSerializer.Serialize(@event)),
ContentType = "application/json"
};
await _sender.SendMessageAsync(message);
}
}
// Azure Function with Service Bus trigger
public class OrderFunctions
{
[FunctionName("ProcessOrderPlaced")]
public async Task Run(
[ServiceBusTrigger("events", "OrderPlacedEvent")]
ServiceBusMessage message)
{
var orderEvent = JsonSerializer.Deserialize<OrderPlacedEvent>(
message.Body.ToString());
await _orderService.ProcessOrderAsync(orderEvent.OrderId);
}
}
Event Patterns
1. Pub/Sub Pattern
// Multiple consumers can subscribe to same event
public class NotificationService
{
public void Handle(OrderPlacedEvent evt)
{
// Send email notification
_emailService.Send(evt.CustomerEmail, "Order placed!");
}
}
public class InventoryService
{
public void Handle(OrderPlacedEvent evt)
{
// Reserve inventory
_inventoryService.ReserveItems(evt.OrderId);
}
}
public class AnalyticsService
{
public void Handle(OrderPlacedEvent evt)
{
// Track analytics
_analytics.TrackOrder(evt.OrderId, evt.TotalAmount);
}
}
2. Event Sourcing
// Events are stored as the source of truth
public class OrderService
{
public async Task PlaceOrder(PlaceOrderCommand command)
{
var order = new Order();
// Apply domain events
foreach (var item in command.Items)
{
order.AddItem(item.ProductId, item.Quantity);
}
// Save to event store
await _eventStore.AppendAsync(order.DomainEvents);
// Publish events
foreach (var evt in order.DomainEvents)
{
await _eventBus.PublishAsync(evt);
}
}
}
3. Choreography (Distributed)
// Services communicate directly via events
// No central orchestrator
public class OrderService
{
public async Task PlaceOrder(Order order)
{
await _eventBus.PublishAsync(new OrderCreatedEvent(order));
}
}
public class InventoryService
{
public async Task Handle(OrderCreatedEvent evt)
{
// Reserve inventory
await _inventoryService.ReserveAsync(evt.OrderId);
// Publish next event
await _eventBus.PublishAsync(new InventoryReservedEvent(evt.OrderId));
}
}
public class PaymentService
{
public async Task Handle(InventoryReservedEvent evt)
{
// Process payment
await _paymentService.ProcessAsync(evt.OrderId);
await _eventBus.PublishAsync(new PaymentProcessedEvent(evt.OrderId));
}
}
4. Orchestration (Centralized)
// Orchestrator coordinates the flow
public class OrderOrchestrator
{
public async Task PlaceOrder(Order order)
{
// Step 1: Validate order
await _orderService.ValidateAsync(order);
// Step 2: Reserve inventory
await _inventoryService.ReserveAsync(order.Id);
// Step 3: Process payment
await _paymentService.ProcessAsync(order.Id);
// Step 4: Confirm order
await _orderService.ConfirmAsync(order.Id);
}
}
Error Handling
1. Retry with Backoff
public async Task Handle(OrderPlacedEvent evt)
{
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
await retryPolicy.ExecuteAsync(async () =>
{
await _inventoryService.ReserveAsync(evt.OrderId);
});
}
2. Dead Letter Queue
public async Task Handle(OrderPlacedEvent evt)
{
try
{
await _inventoryService.ReserveAsync(evt.OrderId);
}
catch (Exception ex)
{
// Send to dead letter queue
await _deadLetterQueue.SendAsync(new FailedEvent
{
OriginalEvent = evt,
Error = ex.Message,
FailedAt = DateTime.UtcNow
});
}
}
3. Idempotency
public async Task Handle(OrderPlacedEvent evt)
{
// Check if already processed
var isProcessed = await _redis.SetAddAsync(
$"processed:{evt.OrderId}",
"OrderPlaced");
if (!isProcessed)
{
return; // Already processed
}
await _inventoryService.ReserveAsync(evt.OrderId);
}
Benefits
| Benefit | Description |
|---|---|
| Loose Coupling | Components don’t know about each other |
| Scalability | Scale consumers independently |
| Flexibility | Easy to add new consumers |
| Resilience | Failed events can be retried |
| Audit Trail | Events provide natural audit trail |
| Async Processing | Non-blocking operations |
Challenges
| Challenge | Description |
|---|---|
| Complexity | Harder to debug và trace |
| Eventual Consistency | Data may not be immediately consistent |
| Duplication | Same event processed by multiple consumers |
| Ordering | Events may arrive out of order |
| Testing | Harder to test distributed systems |
Use Cases
| Use Case | Description |
|---|---|
| Microservices | Service-to-service communication |
| Real-time Processing | Stream processing |
| Notifications | Push notifications |
| Audit | Audit logging |
| Workflow | Multi-step processes |
| IoT | Device event processing |
Best Practices
- Idempotency: Design consumers to handle duplicate events
- Event Size: Keep events small, reference data by ID
- Versioning: Plan for event schema changes
- Error Handling: Use retry policies và dead letter queues
- Monitoring: Track event processing metrics