Event Sourcing là một pattern lưu trữ trạng thái của một application như một chuỗi các events thay vì chỉ lưu trữ current state. Thay vì lưu “what” (current state), chúng ta lưu trữ “what happened” (tất cả các thay đổi).
Traditional Approach:
┌──────────────────────────────────┐
│ Current State Only │
│ ┌────────────────────────────┐ │
│ │ Account Balance: $1000 │ │
│ └────────────────────────────┘ │
└──────────────────────────────────┘
Event Sourcing:
┌──────────────────────────────────┐
│ Event Store │
│ ┌────────────────────────────┐ │
│ │ AccountCreated │ │
│ │ Deposited: $500 │ │
│ │ Deposited: $300 │ │
│ │ Withdrew: $100 │ │
│ │ Withdrew: $200 │ │
│ │ Deposited: $500 │ │
│ └────────────────────────────┘ │
└──────────────────────────────────┘
Current Balance: $1000
Event : Something that happened in the system
Event Store : Database lưu trữ events
Aggregate : Entity whose state is derived from events
Projection : Way to materialize state from events
Snapshot : Cached state để avoid replaying all events
// Base event class
public abstract class Event
{
public Guid Id { get; } = Guid.NewGuid();
public DateTime Timestamp { get; } = DateTime.UtcNow;
public int Version { get; set; }
}
// Domain events
public class AccountCreatedEvent : Event
{
public Guid AccountId { get; }
public string OwnerName { get; }
public string Email { get; }
public AccountCreatedEvent(Guid accountId, string ownerName, string email)
{
AccountId = accountId;
OwnerName = ownerName;
Email = email;
}
}
public class MoneyDepositedEvent : Event
{
public Guid AccountId { get; }
public decimal Amount { get; }
public string Description { get; }
public MoneyDepositedEvent(Guid accountId, decimal amount, string description)
{
AccountId = accountId;
Amount = amount;
Description = description;
}
}
public class MoneyWithdrawnEvent : Event
{
public Guid AccountId { get; }
public decimal Amount { get; }
public string Description { get; }
public MoneyWithdrawnEvent(Guid accountId, decimal amount, string description)
{
AccountId = accountId;
Amount = amount;
Description = description;
}
}
public class BankAccount : AggregateRoot
{
public Guid Id { get; private set; }
public string OwnerName { get; private set; }
public string Email { get; private set; }
public decimal Balance { get; private set; }
private readonly List<Event> _pendingEvents = new();
// For reconstruction from event store
public BankAccount() { }
// Factory method
public static BankAccount Create(string ownerName, string email)
{
var account = new BankAccount
{
Id = Guid.NewGuid(),
OwnerName = ownerName,
Email = email,
Balance = 0
};
account._pendingEvents.Add(
new AccountCreatedEvent(account.Id, ownerName, email));
return account;
}
// Apply events (for replay)
public void Apply(Event evt)
{
switch (evt)
{
case AccountCreatedEvent created:
Id = created.AccountId;
OwnerName = created.OwnerName;
Email = created.Email;
break;
case MoneyDepositedEvent deposited:
Balance += deposited.Amount;
break;
case MoneyWithdrawnEvent withdrawn:
Balance -= withdrawn.Amount;
break;
}
}
public void Deposit(decimal amount, string description)
{
if (amount <= 0)
throw new InvalidOperationException("Amount must be positive");
var evt = new MoneyWithdrawnEvent(Id, amount, description);
Apply(evt);
_pendingEvents.Add(evt);
}
public void Withdraw(decimal amount, string description)
{
if (amount <= 0)
throw new InvalidOperationException("Amount must be positive");
if (amount > Balance)
throw new InvalidOperationException("Insufficient balance");
var evt = new MoneyWithdrawnEvent(Id, amount, description);
Apply(evt);
_pendingEvents.Add(evt);
}
public IReadOnlyList<Event> GetPendingEvents() => _pendingEvents.AsReadOnly();
}
public interface IEventStore
{
Task SaveAsync(Guid aggregateId, IEnumerable<Event> events);
Task<List<Event>> GetEventsAsync(Guid aggregateId, int fromVersion = 0);
}
public class EventStore : IEventStore
{
private readonly DbContext _context;
public async Task SaveAsync(Guid aggregateId, IEnumerable<Event> events)
{
foreach (var evt in events)
{
var storedEvent = new StoredEvent
{
Id = evt.Id,
AggregateId = aggregateId,
EventType = evt.GetType().Name,
Data = JsonSerializer.Serialize(evt),
Timestamp = evt.Timestamp,
Version = evt.Version
};
await _context.Events.AddAsync(storedEvent);
}
await _context.SaveChangesAsync();
}
public async Task<List<Event>> GetEventsAsync(Guid aggregateId, int fromVersion = 0)
{
var storedEvents = await _context.Events
.Where(e => e.AggregateId == aggregateId && e.Version > fromVersion)
.OrderBy(e => e.Version)
.ToListAsync();
return storedEvents.Select(Deserialize).ToList();
}
private Event Deserialize(StoredEvent stored)
{
var type = Type.GetType(stored.EventType);
return JsonSerializer.Deserialize(stored.Data, type) as Event;
}
}
public interface IRepository<T> where T : AggregateRoot
{
Task<T> GetByIdAsync(Guid id);
Task SaveAsync(T aggregate);
}
public class BankAccountRepository : IRepository<BankAccount>
{
private readonly IEventStore _eventStore;
public BankAccountRepository(IEventStore eventStore)
{
_eventStore = eventStore;
}
public async Task<BankAccount> GetByIdAsync(Guid id)
{
var events = await _eventStore.GetEventsAsync(id);
var account = new BankAccount();
foreach (var evt in events)
{
account.Apply(evt);
}
return account;
}
public async Task SaveAsync(BankAccount account)
{
var events = account.GetPendingEvents();
if (events.Any())
{
await _eventStore.SaveAsync(account.Id, events);
account.ClearPendingEvents();
}
}
}
Projections tạo ra views từ events cho việc đọc.
// Simple projection - build current state
public class AccountProjection
{
public static BankAccount Project(IEnumerable<Event> events)
{
var account = new BankAccount();
foreach (var evt in events)
{
account.Apply(evt);
}
return account;
}
}
// Multiple projections for different purposes
public class AccountSummaryProjection
{
private readonly Dictionary<Guid, AccountSummary> _summaries = new();
public void Project(Event evt)
{
switch (evt)
{
case AccountCreatedEvent created:
_summaries[created.AccountId] = new AccountSummary
{
AccountId = created.AccountId,
OwnerName = created.OwnerName,
Balance = 0
};
break;
case MoneyDepositedEvent deposited:
_summaries[deposited.AccountId].Balance += deposited.Amount;
break;
case MoneyWithdrawnEvent withdrawn:
_summaries[withdrawn.AccountId].Balance -= withdrawn.Amount;
break;
}
}
public AccountSummary GetSummary(Guid accountId)
=> _summaries.GetValueOrDefault(accountId);
}
public interface ISnapshotStore
{
Task SaveAsync<T>(Guid aggregateId, int version, T snapshot);
Task<T> GetAsync<T>(Guid aggregateId);
}
public class SnapshottingRepository<T> : IRepository<T> where T : AggregateRoot
{
private readonly IEventStore _eventStore;
private readonly ISnapshotStore _snapshotStore;
private const int SnapshotInterval = 100;
public async Task<T> GetByIdAsync(Guid id)
{
// Try to get snapshot first
var snapshot = await _snapshotStore.GetAsync<T>(id);
var fromVersion = 0;
if (snapshot != null)
{
fromVersion = snapshot.Version;
}
// Get events after snapshot
var events = await _eventStore.GetEventsAsync(id, fromVersion);
// Replay
var aggregate = snapshot?.Data ?? CreateAggregate();
foreach (var evt in events)
{
aggregate.Apply(evt);
}
return aggregate;
}
}
Benefit Description
Complete Audit Full history of all changes
Temporal Queries Query state at any point in time
Event Replay Recreate state by replaying events
Debugging Replay events to understand bugs
Scalability Append-only event store
Flexibility Easy to add new projections
Challenge Description
Complexity More complex than traditional CRUD
Learning Curve Harder to understand
Event Schema Changes require migration strategies
Storage Can grow large (need snapshots)
Consistency Eventual consistency
Use Case Description
Banking Complete transaction history
Audit Full audit trail requirements
Collaboration Activity feeds
E-commerce Order processing
Gaming Game state management
IoT Event logging
Aspect Traditional Event Sourcing
Storage Current state History of changes
Audit Add audit tables Built-in
Queries Query current state Project from events
Complexity Lower Higher
Debugging Harder Replay events