Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Event Sourcing

Overview

Event Sourcing là một pattern lưu trữ trạng thái của một application như một chuỗi các events thay vì chỉ lưu trữ current state. Thay vì lưu “what” (current state), chúng ta lưu trữ “what happened” (tất cả các thay đổi).

Core Concept

Traditional Approach:
┌──────────────────────────────────┐
│     Current State Only           │
│  ┌────────────────────────────┐  │
│  │   Account Balance: $1000  │  │
│  └────────────────────────────┘  │
└──────────────────────────────────┘

Event Sourcing:
┌──────────────────────────────────┐
│     Event Store                  │
│  ┌────────────────────────────┐  │
│  │ AccountCreated             │  │
│  │ Deposited: $500             │  │
│  │ Deposited: $300             │  │
│  │ Withdrew: $100              │  │
│  │ Withdrew: $200              │  │
│  │ Deposited: $500            │  │
│  └────────────────────────────┘  │
└──────────────────────────────────┘
   Current Balance: $1000

Key Terms

  • Event: Something that happened in the system
  • Event Store: Database lưu trữ events
  • Aggregate: Entity whose state is derived from events
  • Projection: Way to materialize state from events
  • Snapshot: Cached state để avoid replaying all events

Implementation

1. Events Definition

// Base event class
public abstract class Event
{
    public Guid Id { get; } = Guid.NewGuid();
    public DateTime Timestamp { get; } = DateTime.UtcNow;
    public int Version { get; set; }
}

// Domain events
public class AccountCreatedEvent : Event
{
    public Guid AccountId { get; }
    public string OwnerName { get; }
    public string Email { get; }
    
    public AccountCreatedEvent(Guid accountId, string ownerName, string email)
    {
        AccountId = accountId;
        OwnerName = ownerName;
        Email = email;
    }
}

public class MoneyDepositedEvent : Event
{
    public Guid AccountId { get; }
    public decimal Amount { get; }
    public string Description { get; }
    
    public MoneyDepositedEvent(Guid accountId, decimal amount, string description)
    {
        AccountId = accountId;
        Amount = amount;
        Description = description;
    }
}

public class MoneyWithdrawnEvent : Event
{
    public Guid AccountId { get; }
    public decimal Amount { get; }
    public string Description { get; }
    
    public MoneyWithdrawnEvent(Guid accountId, decimal amount, string description)
    {
        AccountId = accountId;
        Amount = amount;
        Description = description;
    }
}

2. Aggregate

public class BankAccount : AggregateRoot
{
    public Guid Id { get; private set; }
    public string OwnerName { get; private set; }
    public string Email { get; private set; }
    public decimal Balance { get; private set; }
    
    private readonly List<Event> _pendingEvents = new();
    
    // For reconstruction from event store
    public BankAccount() { }
    
    // Factory method
    public static BankAccount Create(string ownerName, string email)
    {
        var account = new BankAccount
        {
            Id = Guid.NewGuid(),
            OwnerName = ownerName,
            Email = email,
            Balance = 0
        };
        
        account._pendingEvents.Add(
            new AccountCreatedEvent(account.Id, ownerName, email));
        
        return account;
    }
    
    // Apply events (for replay)
    public void Apply(Event evt)
    {
        switch (evt)
        {
            case AccountCreatedEvent created:
                Id = created.AccountId;
                OwnerName = created.OwnerName;
                Email = created.Email;
                break;
                
            case MoneyDepositedEvent deposited:
                Balance += deposited.Amount;
                break;
                
            case MoneyWithdrawnEvent withdrawn:
                Balance -= withdrawn.Amount;
                break;
        }
    }
    
    public void Deposit(decimal amount, string description)
    {
        if (amount <= 0)
            throw new InvalidOperationException("Amount must be positive");
            
        var evt = new MoneyWithdrawnEvent(Id, amount, description);
        Apply(evt);
        _pendingEvents.Add(evt);
    }
    
    public void Withdraw(decimal amount, string description)
    {
        if (amount <= 0)
            throw new InvalidOperationException("Amount must be positive");
            
        if (amount > Balance)
            throw new InvalidOperationException("Insufficient balance");
            
        var evt = new MoneyWithdrawnEvent(Id, amount, description);
        Apply(evt);
        _pendingEvents.Add(evt);
    }
    
    public IReadOnlyList<Event> GetPendingEvents() => _pendingEvents.AsReadOnly();
}

3. Event Store

public interface IEventStore
{
    Task SaveAsync(Guid aggregateId, IEnumerable<Event> events);
    Task<List<Event>> GetEventsAsync(Guid aggregateId, int fromVersion = 0);
}

public class EventStore : IEventStore
{
    private readonly DbContext _context;
    
    public async Task SaveAsync(Guid aggregateId, IEnumerable<Event> events)
    {
        foreach (var evt in events)
        {
            var storedEvent = new StoredEvent
            {
                Id = evt.Id,
                AggregateId = aggregateId,
                EventType = evt.GetType().Name,
                Data = JsonSerializer.Serialize(evt),
                Timestamp = evt.Timestamp,
                Version = evt.Version
            };
            
            await _context.Events.AddAsync(storedEvent);
        }
        
        await _context.SaveChangesAsync();
    }
    
    public async Task<List<Event>> GetEventsAsync(Guid aggregateId, int fromVersion = 0)
    {
        var storedEvents = await _context.Events
            .Where(e => e.AggregateId == aggregateId && e.Version > fromVersion)
            .OrderBy(e => e.Version)
            .ToListAsync();
            
        return storedEvents.Select(Deserialize).ToList();
    }
    
    private Event Deserialize(StoredEvent stored)
    {
        var type = Type.GetType(stored.EventType);
        return JsonSerializer.Deserialize(stored.Data, type) as Event;
    }
}

4. Repository

public interface IRepository<T> where T : AggregateRoot
{
    Task<T> GetByIdAsync(Guid id);
    Task SaveAsync(T aggregate);
}

public class BankAccountRepository : IRepository<BankAccount>
{
    private readonly IEventStore _eventStore;
    
    public BankAccountRepository(IEventStore eventStore)
    {
        _eventStore = eventStore;
    }
    
    public async Task<BankAccount> GetByIdAsync(Guid id)
    {
        var events = await _eventStore.GetEventsAsync(id);
        
        var account = new BankAccount();
        foreach (var evt in events)
        {
            account.Apply(evt);
        }
        
        return account;
    }
    
    public async Task SaveAsync(BankAccount account)
    {
        var events = account.GetPendingEvents();
        
        if (events.Any())
        {
            await _eventStore.SaveAsync(account.Id, events);
            account.ClearPendingEvents();
        }
    }
}

Projections

Projections tạo ra views từ events cho việc đọc.

// Simple projection - build current state
public class AccountProjection
{
    public static BankAccount Project(IEnumerable<Event> events)
    {
        var account = new BankAccount();
        foreach (var evt in events)
        {
            account.Apply(evt);
        }
        return account;
    }
}

// Multiple projections for different purposes
public class AccountSummaryProjection
{
    private readonly Dictionary<Guid, AccountSummary> _summaries = new();
    
    public void Project(Event evt)
    {
        switch (evt)
        {
            case AccountCreatedEvent created:
                _summaries[created.AccountId] = new AccountSummary
                {
                    AccountId = created.AccountId,
                    OwnerName = created.OwnerName,
                    Balance = 0
                };
                break;
                
            case MoneyDepositedEvent deposited:
                _summaries[deposited.AccountId].Balance += deposited.Amount;
                break;
                
            case MoneyWithdrawnEvent withdrawn:
                _summaries[withdrawn.AccountId].Balance -= withdrawn.Amount;
                break;
        }
    }
    
    public AccountSummary GetSummary(Guid accountId)
        => _summaries.GetValueOrDefault(accountId);
}

Snapshots

public interface ISnapshotStore
{
    Task SaveAsync<T>(Guid aggregateId, int version, T snapshot);
    Task<T> GetAsync<T>(Guid aggregateId);
}

public class SnapshottingRepository<T> : IRepository<T> where T : AggregateRoot
{
    private readonly IEventStore _eventStore;
    private readonly ISnapshotStore _snapshotStore;
    private const int SnapshotInterval = 100;
    
    public async Task<T> GetByIdAsync(Guid id)
    {
        // Try to get snapshot first
        var snapshot = await _snapshotStore.GetAsync<T>(id);
        var fromVersion = 0;
        
        if (snapshot != null)
        {
            fromVersion = snapshot.Version;
        }
        
        // Get events after snapshot
        var events = await _eventStore.GetEventsAsync(id, fromVersion);
        
        // Replay
        var aggregate = snapshot?.Data ?? CreateAggregate();
        
        foreach (var evt in events)
        {
            aggregate.Apply(evt);
        }
        
        return aggregate;
    }
}

Benefits

BenefitDescription
Complete AuditFull history of all changes
Temporal QueriesQuery state at any point in time
Event ReplayRecreate state by replaying events
DebuggingReplay events to understand bugs
ScalabilityAppend-only event store
FlexibilityEasy to add new projections

Challenges

ChallengeDescription
ComplexityMore complex than traditional CRUD
Learning CurveHarder to understand
Event SchemaChanges require migration strategies
StorageCan grow large (need snapshots)
ConsistencyEventual consistency

Use Cases

Use CaseDescription
BankingComplete transaction history
AuditFull audit trail requirements
CollaborationActivity feeds
E-commerceOrder processing
GamingGame state management
IoTEvent logging

Comparison

AspectTraditionalEvent Sourcing
StorageCurrent stateHistory of changes
AuditAdd audit tablesBuilt-in
QueriesQuery current stateProject from events
ComplexityLowerHigher
DebuggingHarderReplay events

References