Cluster Management với .NET
Cluster Health
public class ClusterManagementService
{
private readonly ElasticsearchClient _es;
public ClusterManagementService(ElasticsearchClient es) => _es = es;
// Kiểm tra health
public async Task<ClusterHealthSummary> GetHealthAsync()
{
var health = await _es.Cluster.HealthAsync(h => h
.WaitForStatus(WaitForStatus.Green) // Chờ đến khi green (optional)
.Timeout(new Duration("10s"))
);
return new ClusterHealthSummary
{
Status = health.Status.ToString(),
NumberOfNodes = (int)health.NumberOfNodes,
ActiveShards = (int)health.ActiveShards,
UnassignedShards = (int)health.UnassignedShards,
IsHealthy = health.Status == HealthStatus.Green,
};
}
// Xem stats của index
public async Task PrintIndexStatsAsync(string indexName)
{
var response = await _es.Indices.StatsAsync(indexName);
var stats = response.Indices[indexName].Total;
Console.WriteLine($"Documents: {stats?.Docs?.Count:N0}");
Console.WriteLine($"Store size: {stats?.Store?.SizeInBytes / 1024 / 1024:N0} MB");
Console.WriteLine($"Search rate: {stats?.Search?.QueryTotal:N0} queries total");
}
}
public record ClusterHealthSummary
{
public string Status { get; init; } = string.Empty;
public int NumberOfNodes { get; init; }
public int ActiveShards { get; init; }
public int UnassignedShards { get; init; }
public bool IsHealthy { get; init; }
}
Index Lifecycle Management (ILM)
ILM tự động quản lý vòng đời của time-series indices (logs, events).
public async Task SetupIlmPolicyAsync()
{
// 1. Tạo ILM Policy
await _es.IndexLifecycleManagement.PutLifecycleAsync("logs-policy", p => p
.Policy(pol => pol
.Phases(ph => ph
// Hot phase: index đang active, write nhiều
.Hot(hot => hot
.MinAge(new Duration("0ms"))
.Actions(a => a
.Rollover(ro => ro
.MaxSize("50gb") // Rollover khi > 50GB
.MaxAge(new Duration("7d")) // Hoặc sau 7 ngày
.MaxDocs(10_000_000) // Hoặc 10M docs
)
.SetPriority(sp => sp.Priority(100))
)
)
// Warm phase: không write, search thỉnh thoảng
.Warm(warm => warm
.MinAge(new Duration("7d"))
.Actions(a => a
.Shrink(sh => sh.NumberOfShards(1)) // Giảm shard count
.ForceMerge(fm => fm.MaxNumSegments(1))
.SetPriority(sp => sp.Priority(50))
)
)
// Cold phase: search hiếm, long-term storage
.Cold(cold => cold
.MinAge(new Duration("30d"))
.Actions(a => a
.Searchable_snapshot(ss => ss
.SnapshotRepository("my-repo")
)
.SetPriority(sp => sp.Priority(0))
)
)
// Delete phase
.Delete(del => del
.MinAge(new Duration("90d"))
.Actions(a => a.Delete())
)
)
)
);
// 2. Tạo Index Template gắn với ILM
await _es.Indices.PutIndexTemplateAsync("logs-template", t => t
.IndexPatterns(new[] { "logs-*" })
.Template(tmpl => tmpl
.Settings(s => s
.NumberOfShards(1)
.NumberOfReplicas(1)
.Add("index.lifecycle.name", "logs-policy")
.Add("index.lifecycle.rollover_alias", "logs")
)
.Mappings(m => m
.Properties(p => p
.Date(d => d.Name("@timestamp"))
.Keyword(k => k.Name("level"))
.Text(txt => txt.Name("message"))
.Keyword(k => k.Name("service"))
)
)
)
);
// 3. Tạo index đầu tiên với alias
var bootstrapIndex = $"logs-{DateTime.UtcNow:yyyy.MM.dd}-000001";
await _es.Indices.CreateAsync(bootstrapIndex, c => c
.Aliases(a => a
.Add("logs", al => al.IsWriteIndex(true))
)
);
}
// Ghi log - luôn dùng alias, không dùng index name trực tiếp
public async Task WriteLogAsync(LogEntry entry)
{
await _es.IndexAsync(entry, i => i.Index("logs")); // alias
}
Alias Management
// Alias = tên thay thế cho index (hoặc nhiều indices)
// Dùng cho: blue-green deployment, zero-downtime reindex
public async Task SwapAliasAsync(string alias, string oldIndex, string newIndex)
{
await _es.Indices.UpdateAliasesAsync(a => a
.Actions(
new RemoveIndexAction { Index = oldIndex, Alias = alias },
new AddAction { Index = newIndex, Alias = alias }
)
);
}
// Đọc alias hiện tại
public async Task<string?> GetIndexForAliasAsync(string alias)
{
var response = await _es.Indices.GetAliasAsync(alias);
return response.Indices.Keys.FirstOrDefault()?.ToString();
}
// Filtered alias - alias với filter query
await _es.Indices.PutAliasAsync("products_in_stock", "products", a => a
.Filter(f => f.Term(t => t.Field("in_stock").Value(true)))
.IsWriteIndex(false)
);
// Query qua filtered alias tự động áp dụng filter
var response = await _es.SearchAsync<Product>(s => s
.Index("products_in_stock") // Chỉ products có in_stock=true
.Query(q => q.MatchAll())
);
Snapshot & Restore
// Backup và restore
public class SnapshotService
{
private readonly ElasticsearchClient _es;
private const string RepoName = "my-backups";
// Đăng ký repository (thực hiện một lần)
public async Task RegisterRepositoryAsync(string storagePath)
{
await _es.Snapshot.CreateRepositoryAsync(RepoName, r => r
.Repository(sr => sr
.Fs(fs => fs.Settings(set => set.Location(storagePath)))
)
);
}
// Tạo snapshot
public async Task CreateSnapshotAsync(string snapshotName, string[]? indices = null)
{
await _es.Snapshot.CreateAsync(RepoName, snapshotName, s => s
.Indices(indices?.Select(i => (IndexName)i).ToArray() ?? Array.Empty<IndexName>())
.IncludeGlobalState(false)
.WaitForCompletion(true)
);
}
// Restore snapshot
public async Task RestoreSnapshotAsync(string snapshotName, string[]? indices = null)
{
await _es.Snapshot.RestoreAsync(RepoName, snapshotName, r => r
.Indices(indices?.Select(i => (IndexName)i).ToArray() ?? Array.Empty<IndexName>())
.WaitForCompletion(true)
);
}
// Liệt kê snapshots
public async Task<List<string>> ListSnapshotsAsync()
{
var response = await _es.Snapshot.GetAsync(RepoName, "_all");
return response.Snapshots?
.Select(s => $"{s.Snapshot} - {s.State} - {s.StartTime:yyyy-MM-dd}")
.ToList() ?? [];
}
}
ASP.NET Core Integration Pattern (Full)
// ElasticsearchOptions.cs
public class ElasticsearchOptions
{
public const string SectionName = "Elasticsearch";
public string Uri { get; set; } = "http://localhost:9200";
public string? Username { get; set; }
public string? Password { get; set; }
public string DefaultIndex { get; set; } = "default";
public bool EnableDebugMode { get; set; }
public int RequestTimeoutSeconds { get; set; } = 30;
}
// Extensions/ElasticsearchExtensions.cs
public static class ElasticsearchExtensions
{
public static IServiceCollection AddElasticsearch(
this IServiceCollection services,
IConfiguration configuration)
{
var options = configuration
.GetSection(ElasticsearchOptions.SectionName)
.Get<ElasticsearchOptions>()
?? throw new InvalidOperationException("Elasticsearch options not configured");
services.Configure<ElasticsearchOptions>(
configuration.GetSection(ElasticsearchOptions.SectionName)
);
services.AddSingleton<ElasticsearchClient>(_ =>
{
var settings = new ElasticsearchClientSettings(new Uri(options.Uri))
.DefaultIndex(options.DefaultIndex)
.RequestTimeout(TimeSpan.FromSeconds(options.RequestTimeoutSeconds))
.DefaultMappingFor<Product>(m => m.IndexName("products"))
.DefaultMappingFor<Order>(m => m.IndexName("orders"))
.DefaultMappingFor<LogEntry>(m => m.IndexName("logs"));
if (!string.IsNullOrEmpty(options.Username))
settings = settings.Authentication(
new BasicAuthentication(options.Username, options.Password!)
);
if (options.EnableDebugMode)
settings = settings
.EnableDebugMode()
.DisableDirectStreaming();
return new ElasticsearchClient(settings);
});
// Repositories
services.AddScoped<IProductSearchRepository, ProductSearchRepository>();
services.AddScoped<IProductIndexRepository, ProductIndexRepository>();
// Health check
services.AddHealthChecks()
.AddCheck<ElasticsearchHealthCheck>("elasticsearch",
tags: new[] { "search", "ready" });
return services;
}
}
// Program.cs
builder.Services.AddElasticsearch(builder.Configuration);