Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Cluster Management với .NET

Cluster Health

public class ClusterManagementService
{
    private readonly ElasticsearchClient _es;

    public ClusterManagementService(ElasticsearchClient es) => _es = es;

    // Kiểm tra health
    public async Task<ClusterHealthSummary> GetHealthAsync()
    {
        var health = await _es.Cluster.HealthAsync(h => h
            .WaitForStatus(WaitForStatus.Green) // Chờ đến khi green (optional)
            .Timeout(new Duration("10s"))
        );

        return new ClusterHealthSummary
        {
            Status           = health.Status.ToString(),
            NumberOfNodes    = (int)health.NumberOfNodes,
            ActiveShards     = (int)health.ActiveShards,
            UnassignedShards = (int)health.UnassignedShards,
            IsHealthy        = health.Status == HealthStatus.Green,
        };
    }

    // Xem stats của index
    public async Task PrintIndexStatsAsync(string indexName)
    {
        var response = await _es.Indices.StatsAsync(indexName);
        var stats = response.Indices[indexName].Total;

        Console.WriteLine($"Documents: {stats?.Docs?.Count:N0}");
        Console.WriteLine($"Store size: {stats?.Store?.SizeInBytes / 1024 / 1024:N0} MB");
        Console.WriteLine($"Search rate: {stats?.Search?.QueryTotal:N0} queries total");
    }
}

public record ClusterHealthSummary
{
    public string Status { get; init; } = string.Empty;
    public int NumberOfNodes { get; init; }
    public int ActiveShards { get; init; }
    public int UnassignedShards { get; init; }
    public bool IsHealthy { get; init; }
}

Index Lifecycle Management (ILM)

ILM tự động quản lý vòng đời của time-series indices (logs, events).

public async Task SetupIlmPolicyAsync()
{
    // 1. Tạo ILM Policy
    await _es.IndexLifecycleManagement.PutLifecycleAsync("logs-policy", p => p
        .Policy(pol => pol
            .Phases(ph => ph
                // Hot phase: index đang active, write nhiều
                .Hot(hot => hot
                    .MinAge(new Duration("0ms"))
                    .Actions(a => a
                        .Rollover(ro => ro
                            .MaxSize("50gb")       // Rollover khi > 50GB
                            .MaxAge(new Duration("7d")) // Hoặc sau 7 ngày
                            .MaxDocs(10_000_000)   // Hoặc 10M docs
                        )
                        .SetPriority(sp => sp.Priority(100))
                    )
                )
                // Warm phase: không write, search thỉnh thoảng
                .Warm(warm => warm
                    .MinAge(new Duration("7d"))
                    .Actions(a => a
                        .Shrink(sh => sh.NumberOfShards(1)) // Giảm shard count
                        .ForceMerge(fm => fm.MaxNumSegments(1))
                        .SetPriority(sp => sp.Priority(50))
                    )
                )
                // Cold phase: search hiếm, long-term storage
                .Cold(cold => cold
                    .MinAge(new Duration("30d"))
                    .Actions(a => a
                        .Searchable_snapshot(ss => ss
                            .SnapshotRepository("my-repo")
                        )
                        .SetPriority(sp => sp.Priority(0))
                    )
                )
                // Delete phase
                .Delete(del => del
                    .MinAge(new Duration("90d"))
                    .Actions(a => a.Delete())
                )
            )
        )
    );

    // 2. Tạo Index Template gắn với ILM
    await _es.Indices.PutIndexTemplateAsync("logs-template", t => t
        .IndexPatterns(new[] { "logs-*" })
        .Template(tmpl => tmpl
            .Settings(s => s
                .NumberOfShards(1)
                .NumberOfReplicas(1)
                .Add("index.lifecycle.name", "logs-policy")
                .Add("index.lifecycle.rollover_alias", "logs")
            )
            .Mappings(m => m
                .Properties(p => p
                    .Date(d => d.Name("@timestamp"))
                    .Keyword(k => k.Name("level"))
                    .Text(txt => txt.Name("message"))
                    .Keyword(k => k.Name("service"))
                )
            )
        )
    );

    // 3. Tạo index đầu tiên với alias
    var bootstrapIndex = $"logs-{DateTime.UtcNow:yyyy.MM.dd}-000001";
    await _es.Indices.CreateAsync(bootstrapIndex, c => c
        .Aliases(a => a
            .Add("logs", al => al.IsWriteIndex(true))
        )
    );
}

// Ghi log - luôn dùng alias, không dùng index name trực tiếp
public async Task WriteLogAsync(LogEntry entry)
{
    await _es.IndexAsync(entry, i => i.Index("logs")); // alias
}

Alias Management

// Alias = tên thay thế cho index (hoặc nhiều indices)
// Dùng cho: blue-green deployment, zero-downtime reindex

public async Task SwapAliasAsync(string alias, string oldIndex, string newIndex)
{
    await _es.Indices.UpdateAliasesAsync(a => a
        .Actions(
            new RemoveIndexAction { Index = oldIndex, Alias = alias },
            new AddAction { Index = newIndex,  Alias = alias }
        )
    );
}

// Đọc alias hiện tại
public async Task<string?> GetIndexForAliasAsync(string alias)
{
    var response = await _es.Indices.GetAliasAsync(alias);
    return response.Indices.Keys.FirstOrDefault()?.ToString();
}

// Filtered alias - alias với filter query
await _es.Indices.PutAliasAsync("products_in_stock", "products", a => a
    .Filter(f => f.Term(t => t.Field("in_stock").Value(true)))
    .IsWriteIndex(false)
);

// Query qua filtered alias tự động áp dụng filter
var response = await _es.SearchAsync<Product>(s => s
    .Index("products_in_stock") // Chỉ products có in_stock=true
    .Query(q => q.MatchAll())
);

Snapshot & Restore

// Backup và restore
public class SnapshotService
{
    private readonly ElasticsearchClient _es;
    private const string RepoName = "my-backups";

    // Đăng ký repository (thực hiện một lần)
    public async Task RegisterRepositoryAsync(string storagePath)
    {
        await _es.Snapshot.CreateRepositoryAsync(RepoName, r => r
            .Repository(sr => sr
                .Fs(fs => fs.Settings(set => set.Location(storagePath)))
            )
        );
    }

    // Tạo snapshot
    public async Task CreateSnapshotAsync(string snapshotName, string[]? indices = null)
    {
        await _es.Snapshot.CreateAsync(RepoName, snapshotName, s => s
            .Indices(indices?.Select(i => (IndexName)i).ToArray() ?? Array.Empty<IndexName>())
            .IncludeGlobalState(false)
            .WaitForCompletion(true)
        );
    }

    // Restore snapshot
    public async Task RestoreSnapshotAsync(string snapshotName, string[]? indices = null)
    {
        await _es.Snapshot.RestoreAsync(RepoName, snapshotName, r => r
            .Indices(indices?.Select(i => (IndexName)i).ToArray() ?? Array.Empty<IndexName>())
            .WaitForCompletion(true)
        );
    }

    // Liệt kê snapshots
    public async Task<List<string>> ListSnapshotsAsync()
    {
        var response = await _es.Snapshot.GetAsync(RepoName, "_all");
        return response.Snapshots?
            .Select(s => $"{s.Snapshot} - {s.State} - {s.StartTime:yyyy-MM-dd}")
            .ToList() ?? [];
    }
}

ASP.NET Core Integration Pattern (Full)

// ElasticsearchOptions.cs
public class ElasticsearchOptions
{
    public const string SectionName = "Elasticsearch";

    public string Uri { get; set; } = "http://localhost:9200";
    public string? Username { get; set; }
    public string? Password { get; set; }
    public string DefaultIndex { get; set; } = "default";
    public bool EnableDebugMode { get; set; }
    public int RequestTimeoutSeconds { get; set; } = 30;
}

// Extensions/ElasticsearchExtensions.cs
public static class ElasticsearchExtensions
{
    public static IServiceCollection AddElasticsearch(
        this IServiceCollection services,
        IConfiguration configuration)
    {
        var options = configuration
            .GetSection(ElasticsearchOptions.SectionName)
            .Get<ElasticsearchOptions>()
            ?? throw new InvalidOperationException("Elasticsearch options not configured");

        services.Configure<ElasticsearchOptions>(
            configuration.GetSection(ElasticsearchOptions.SectionName)
        );

        services.AddSingleton<ElasticsearchClient>(_ =>
        {
            var settings = new ElasticsearchClientSettings(new Uri(options.Uri))
                .DefaultIndex(options.DefaultIndex)
                .RequestTimeout(TimeSpan.FromSeconds(options.RequestTimeoutSeconds))
                .DefaultMappingFor<Product>(m => m.IndexName("products"))
                .DefaultMappingFor<Order>(m => m.IndexName("orders"))
                .DefaultMappingFor<LogEntry>(m => m.IndexName("logs"));

            if (!string.IsNullOrEmpty(options.Username))
                settings = settings.Authentication(
                    new BasicAuthentication(options.Username, options.Password!)
                );

            if (options.EnableDebugMode)
                settings = settings
                    .EnableDebugMode()
                    .DisableDirectStreaming();

            return new ElasticsearchClient(settings);
        });

        // Repositories
        services.AddScoped<IProductSearchRepository, ProductSearchRepository>();
        services.AddScoped<IProductIndexRepository, ProductIndexRepository>();

        // Health check
        services.AddHealthChecks()
            .AddCheck<ElasticsearchHealthCheck>("elasticsearch",
                tags: new[] { "search", "ready" });

        return services;
    }
}

// Program.cs
builder.Services.AddElasticsearch(builder.Configuration);