Indexing Documents với .NET
Model & DI Setup
// Models/Product.cs
public class Product
{
public int Id { get; set; }
public string Name { get; set; } = string.Empty;
public string? Description { get; set; }
public string Category { get; set; } = string.Empty;
public string Brand { get; set; } = string.Empty;
public decimal Price { get; set; }
public bool InStock { get; set; }
public int StockQuantity { get; set; }
public List<string> Tags { get; set; } = [];
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
// Program.cs - DI registration
builder.Services.AddSingleton<ElasticsearchClient>(_ =>
{
var settings = new ElasticsearchClientSettings(new Uri("https://localhost:9200"))
.Authentication(new BasicAuthentication("elastic", "changeme"))
.DefaultMappingFor<Product>(m => m.IndexName("products"))
.DefaultMappingFor<Order>(m => m.IndexName("orders"));
return new ElasticsearchClient(settings);
});
Tạo Index với Mapping
public async Task CreateIndexAsync()
{
var exists = await _es.Indices.ExistsAsync("products");
if (exists.Exists) return;
await _es.Indices.CreateAsync<Product>("products", c => c
.Settings(s => s
.NumberOfShards(1)
.NumberOfReplicas(1)
)
.Mappings(m => m
.Dynamic(DynamicMapping.Strict)
.Properties(p => p
.Text(t => t.Name(n => n.Name)
.Analyzer("standard")
.Fields(f => f.Keyword(k => k.Name("keyword")))
)
.Text(t => t.Name(n => n.Description).Analyzer("english"))
.Keyword(k => k.Name(n => n.Category))
.ScaledFloat(sf => sf.Name(n => n.Price).ScalingFactor(100))
.Boolean(b => b.Name(n => n.InStock))
.Integer(i => i.Name(n => n.StockQuantity))
.Keyword(k => k.Name(n => n.Tags))
.Date(d => d.Name(n => n.CreatedAt))
)
)
);
}
CRUD Operations
Create / Index
// Index single document
public async Task<bool> IndexProductAsync(Product product)
{
var response = await _es.IndexAsync(product, i => i
.Index("products")
.Id(product.Id.ToString())
);
return response.IsSuccess();
}
// Bulk index nhiều documents
public async Task BulkIndexAsync(IEnumerable<Product> products)
{
var response = await _es.BulkAsync(b => b
.Index("products")
.IndexMany(products, (d, p) => d.Id(p.Id.ToString()))
);
if (response.Errors)
{
var failed = response.ItemsWithErrors.Select(i => i.Id);
throw new Exception($"Bulk failed for IDs: {string.Join(", ", failed)}");
}
}
Get
public async Task<Product?> GetByIdAsync(int id)
{
var response = await _es.GetAsync<Product>(id.ToString());
return response.Found ? response.Source : null;
}
// Multi-get
public async Task<List<Product>> GetManyAsync(IEnumerable<int> ids)
{
var response = await _es.MgetAsync<Product>(m => m
.Ids(ids.Select(i => (Id)i.ToString()))
);
return response.Docs
.OfType<GetResponse<Product>>()
.Where(d => d.Found && d.Source is not null)
.Select(d => d.Source!)
.ToList();
}
Update
// Partial update - chỉ fields cần thay
public async Task UpdatePriceAsync(int id, decimal newPrice)
{
await _es.UpdateAsync<Product, object>(id.ToString(), u => u
.Doc(new { Price = newPrice })
);
}
// Update với script
public async Task IncrementStockAsync(int id, int amount)
{
await _es.UpdateAsync<Product, object>(id.ToString(), u => u
.Script(s => s
.Source("ctx._source.stock_quantity += params.amount")
.Params(p => p.Add("amount", amount))
)
);
}
// Upsert
public async Task UpsertProductAsync(Product product)
{
await _es.UpdateAsync<Product, Product>(product.Id.ToString(), u => u
.Doc(product)
.DocAsUpsert(true)
);
}
Delete
// Delete by ID
public async Task<bool> DeleteAsync(int id)
{
var response = await _es.DeleteAsync(id.ToString(), d => d.Index("products"));
return response.IsSuccess();
}
// Delete by query (e.g. out-of-stock products with price < 10)
public async Task DeleteByQueryAsync(decimal maxPrice)
{
await _es.DeleteByQueryAsync<Product>(d => d
.Query(q => q
.Range(r => r
.NumberRange(nr => nr
.Field(f => f.Price)
.LessThan((double)maxPrice)
)
)
)
);
}
Bulk Indexing - Pattern Hiệu suất cao
// Tắt refresh, bulk index, bật lại → nhanh hơn nhiều
public async Task FullReindexAsync(IAsyncEnumerable<Product> products)
{
// 1. Tắt refresh
await _es.Indices.PutSettingsAsync("products", s =>
s.RefreshInterval(new Duration("-1"))
);
try
{
var batch = new List<Product>();
await foreach (var product in products)
{
batch.Add(product);
if (batch.Count >= 500)
{
await _es.BulkAsync(b => b
.Index("products")
.IndexMany(batch, (d, p) => d.Id(p.Id.ToString()))
);
batch.Clear();
}
}
if (batch.Count > 0)
await _es.BulkAsync(b => b
.Index("products")
.IndexMany(batch, (d, p) => d.Id(p.Id.ToString()))
);
}
finally
{
// 2. Bật lại refresh
await _es.Indices.PutSettingsAsync("products", s =>
s.RefreshInterval(new Duration("1s"))
);
await _es.Indices.RefreshAsync("products");
}
}
Equivalent JSON (tham khảo)
// PUT /products/_doc/1 (tương đương IndexAsync)
{
"name": "iPhone 15 Pro",
"price": 999.99,
"category": "smartphones",
"in_stock": true
}
// POST /products/_update/1 (tương đương UpdateAsync partial)
{
"doc": { "price": 899.99 }
}
// POST /_bulk
{ "index": { "_index": "products", "_id": "1" } }
{ "name": "iPhone 15 Pro", "price": 999.99 }
{ "index": { "_index": "products", "_id": "2" } }
{ "name": "Galaxy S24", "price": 899.99 }
Lưu ý: Trong .NET client,
DefaultMappingFor<T>đã gắn index name vào type, nên không cần chỉ định.Index(...)mỗi lần nếu đã config trongProgram.cs.
CRUD Operations - Raw API
Create / Index
# PUT với ID cụ thể - tạo mới hoặc replace toàn bộ document (tham khảo thêm)
PUT /products/_doc/1
{
"name": "iPhone 15 Pro",
"price": 999.99,
"category": "smartphones"
}
# POST không có ID - ES tự generate ID
POST /products/_doc
{
"name": "Galaxy S24",
"price": 899.99
}
# Response: "_id": "abcdef123456..."
# PUT _create - chỉ tạo mới, fail nếu ID đã tồn tại
PUT /products/_create/1
{
"name": "iPhone 15 Pro"
}
# 409 Conflict nếu ID=1 đã tồn tại
Read
# Get by ID
GET /products/_doc/1
# Response
{
"_index": "products",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"name": "iPhone 15 Pro",
"price": 999.99
}
}
# Multi-get
GET /products/_mget
{
"ids": ["1", "2", "3"]
}
# Chỉ lấy fields cần thiết
GET /products/_doc/1?_source_includes=name,price
GET /products/_doc/1?_source=false # Không lấy _source
Update
# Update một phần (partial update) - chỉ thay đổi fields chỉ định
POST /products/_update/1
{
"doc": {
"price": 899.99,
"in_stock": true
}
}
# Update với script
POST /products/_update/1
{
"script": {
"source": "ctx._source.price *= params.discount",
"params": {
"discount": 0.9
}
}
}
# Upsert - update nếu tồn tại, insert nếu không
POST /products/_update/99
{
"doc": { "name": "New Product", "price": 199.99 },
"doc_as_upsert": true
}
# Update by query
POST /products/_update_by_query
{
"query": {
"term": { "category": "smartphones" }
},
"script": {
"source": "ctx._source.discount_eligible = true"
}
}
Delete
# Delete by ID
DELETE /products/_doc/1
# Delete by query
POST /products/_delete_by_query
{
"query": {
"range": {
"price": { "lt": 10 } # Xóa products giá dưới $10
}
}
}
Bulk API
Thực hiện nhiều operations trong một request - hiệu suất cao hơn nhiều.
POST /_bulk
{ "index": { "_index": "products", "_id": "1" } }
{ "name": "iPhone 15 Pro", "price": 999.99, "category": "smartphones" }
{ "index": { "_index": "products", "_id": "2" } }
{ "name": "Galaxy S24", "price": 899.99, "category": "smartphones" }
{ "create": { "_index": "products", "_id": "3" } }
{ "name": "Pixel 8", "price": 699.99, "category": "smartphones" }
{ "update": { "_index": "products", "_id": "1" } }
{ "doc": { "price": 949.99 } }
{ "delete": { "_index": "products", "_id": "99" } }
Bulk response:
{
"took": 30,
"errors": false,
"items": [
{ "index": { "_id": "1", "result": "created", "status": 201 } },
{ "index": { "_id": "2", "result": "created", "status": 201 } },
{ "create": { "_id": "3", "result": "created", "status": 201 } },
{ "update": { "_id": "1", "result": "updated", "status": 200 } },
{ "delete": { "_id": "99", "result": "not_found", "status": 404 } }
]
}
Best Practices cho Bulk Indexing
# 1. Batch size: 5-15MB per bulk request
# 2. Tắt refresh trong quá trình bulk import lớn
PUT /products/_settings
{ "refresh_interval": "-1" }
# 3. Thực hiện bulk indexing
# ... nhiều bulk requests ...
# 4. Bật lại refresh
PUT /products/_settings
{ "refresh_interval": "1s" }
# 5. Force merge sau khi xong (tùy chọn)
POST /products/_forcemerge?max_num_segments=1
Pipeline Processing
Xử lý documents trước khi index với Ingest Pipelines.
# Tạo pipeline
PUT /_ingest/pipeline/product-pipeline
{
"description": "Process products before indexing",
"processors": [
{
"lowercase": {
"field": "category"
}
},
{
"trim": {
"field": "name"
}
},
{
"set": {
"field": "indexed_at",
"value": "{{{_ingest.timestamp}}}"
}
},
{
"convert": {
"field": "price",
"type": "float"
}
},
{
"remove": {
"field": ["internal_id", "raw_data"],
"ignore_missing": true
}
}
],
"on_failure": [
{
"set": {
"field": "error.message",
"value": "{{ _ingest.on_failure_message }}"
}
}
]
}
# Sử dụng pipeline khi index
POST /products/_doc?pipeline=product-pipeline
{
"name": " iPhone 15 Pro ",
"price": "999.99",
"category": "Smartphones"
}
# Đặt default pipeline cho index
PUT /products/_settings
{
"default_pipeline": "product-pipeline"
}
Routing
# Mặc định: routing = document ID → hash → shard
# Custom routing: điều hướng related documents tới cùng shard
# Index với custom routing (ví dụ: theo user_id)
PUT /orders/_doc/order123?routing=user456
{
"order_id": "order123",
"user_id": "user456",
"total": 150.00
}
# Search phải dùng cùng routing để chỉ query shard chứa data
GET /orders/_search?routing=user456
{
"query": {
"term": { "user_id": "user456" }
}
}
# Lợi ích: Chỉ query 1 shard thay vì tất cả
# Chú ý: Nếu routing skewed → "hot shard" vấn đề