Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Indexing Documents với .NET

Model & DI Setup

// Models/Product.cs
public class Product
{
    public int Id { get; set; }
    public string Name { get; set; } = string.Empty;
    public string? Description { get; set; }
    public string Category { get; set; } = string.Empty;
    public string Brand { get; set; } = string.Empty;
    public decimal Price { get; set; }
    public bool InStock { get; set; }
    public int StockQuantity { get; set; }
    public List<string> Tags { get; set; } = [];
    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}

// Program.cs - DI registration
builder.Services.AddSingleton<ElasticsearchClient>(_ =>
{
    var settings = new ElasticsearchClientSettings(new Uri("https://localhost:9200"))
        .Authentication(new BasicAuthentication("elastic", "changeme"))
        .DefaultMappingFor<Product>(m => m.IndexName("products"))
        .DefaultMappingFor<Order>(m => m.IndexName("orders"));

    return new ElasticsearchClient(settings);
});

Tạo Index với Mapping

public async Task CreateIndexAsync()
{
    var exists = await _es.Indices.ExistsAsync("products");
    if (exists.Exists) return;

    await _es.Indices.CreateAsync<Product>("products", c => c
        .Settings(s => s
            .NumberOfShards(1)
            .NumberOfReplicas(1)
        )
        .Mappings(m => m
            .Dynamic(DynamicMapping.Strict)
            .Properties(p => p
                .Text(t => t.Name(n => n.Name)
                    .Analyzer("standard")
                    .Fields(f => f.Keyword(k => k.Name("keyword")))
                )
                .Text(t => t.Name(n => n.Description).Analyzer("english"))
                .Keyword(k => k.Name(n => n.Category))
                .ScaledFloat(sf => sf.Name(n => n.Price).ScalingFactor(100))
                .Boolean(b => b.Name(n => n.InStock))
                .Integer(i => i.Name(n => n.StockQuantity))
                .Keyword(k => k.Name(n => n.Tags))
                .Date(d => d.Name(n => n.CreatedAt))
            )
        )
    );
}

CRUD Operations

Create / Index

// Index single document
public async Task<bool> IndexProductAsync(Product product)
{
    var response = await _es.IndexAsync(product, i => i
        .Index("products")
        .Id(product.Id.ToString())
    );
    return response.IsSuccess();
}

// Bulk index nhiều documents
public async Task BulkIndexAsync(IEnumerable<Product> products)
{
    var response = await _es.BulkAsync(b => b
        .Index("products")
        .IndexMany(products, (d, p) => d.Id(p.Id.ToString()))
    );

    if (response.Errors)
    {
        var failed = response.ItemsWithErrors.Select(i => i.Id);
        throw new Exception($"Bulk failed for IDs: {string.Join(", ", failed)}");
    }
}

Get

public async Task<Product?> GetByIdAsync(int id)
{
    var response = await _es.GetAsync<Product>(id.ToString());
    return response.Found ? response.Source : null;
}

// Multi-get
public async Task<List<Product>> GetManyAsync(IEnumerable<int> ids)
{
    var response = await _es.MgetAsync<Product>(m => m
        .Ids(ids.Select(i => (Id)i.ToString()))
    );

    return response.Docs
        .OfType<GetResponse<Product>>()
        .Where(d => d.Found && d.Source is not null)
        .Select(d => d.Source!)
        .ToList();
}

Update

// Partial update - chỉ fields cần thay
public async Task UpdatePriceAsync(int id, decimal newPrice)
{
    await _es.UpdateAsync<Product, object>(id.ToString(), u => u
        .Doc(new { Price = newPrice })
    );
}

// Update với script
public async Task IncrementStockAsync(int id, int amount)
{
    await _es.UpdateAsync<Product, object>(id.ToString(), u => u
        .Script(s => s
            .Source("ctx._source.stock_quantity += params.amount")
            .Params(p => p.Add("amount", amount))
        )
    );
}

// Upsert
public async Task UpsertProductAsync(Product product)
{
    await _es.UpdateAsync<Product, Product>(product.Id.ToString(), u => u
        .Doc(product)
        .DocAsUpsert(true)
    );
}

Delete

// Delete by ID
public async Task<bool> DeleteAsync(int id)
{
    var response = await _es.DeleteAsync(id.ToString(), d => d.Index("products"));
    return response.IsSuccess();
}

// Delete by query (e.g. out-of-stock products with price < 10)
public async Task DeleteByQueryAsync(decimal maxPrice)
{
    await _es.DeleteByQueryAsync<Product>(d => d
        .Query(q => q
            .Range(r => r
                .NumberRange(nr => nr
                    .Field(f => f.Price)
                    .LessThan((double)maxPrice)
                )
            )
        )
    );
}

Bulk Indexing - Pattern Hiệu suất cao

// Tắt refresh, bulk index, bật lại → nhanh hơn nhiều
public async Task FullReindexAsync(IAsyncEnumerable<Product> products)
{
    // 1. Tắt refresh
    await _es.Indices.PutSettingsAsync("products", s =>
        s.RefreshInterval(new Duration("-1"))
    );

    try
    {
        var batch = new List<Product>();

        await foreach (var product in products)
        {
            batch.Add(product);
            if (batch.Count >= 500)
            {
                await _es.BulkAsync(b => b
                    .Index("products")
                    .IndexMany(batch, (d, p) => d.Id(p.Id.ToString()))
                );
                batch.Clear();
            }
        }

        if (batch.Count > 0)
            await _es.BulkAsync(b => b
                .Index("products")
                .IndexMany(batch, (d, p) => d.Id(p.Id.ToString()))
            );
    }
    finally
    {
        // 2. Bật lại refresh
        await _es.Indices.PutSettingsAsync("products", s =>
            s.RefreshInterval(new Duration("1s"))
        );
        await _es.Indices.RefreshAsync("products");
    }
}

Equivalent JSON (tham khảo)

// PUT /products/_doc/1 (tương đương IndexAsync)
{
  "name": "iPhone 15 Pro",
  "price": 999.99,
  "category": "smartphones",
  "in_stock": true
}

// POST /products/_update/1 (tương đương UpdateAsync partial)
{
  "doc": { "price": 899.99 }
}

// POST /_bulk
{ "index": { "_index": "products", "_id": "1" } }
{ "name": "iPhone 15 Pro", "price": 999.99 }
{ "index": { "_index": "products", "_id": "2" } }
{ "name": "Galaxy S24", "price": 899.99 }

Lưu ý: Trong .NET client, DefaultMappingFor<T> đã gắn index name vào type, nên không cần chỉ định .Index(...) mỗi lần nếu đã config trong Program.cs.

CRUD Operations - Raw API

Create / Index

# PUT với ID cụ thể - tạo mới hoặc replace toàn bộ document (tham khảo thêm)
PUT /products/_doc/1
{
  "name": "iPhone 15 Pro",
  "price": 999.99,
  "category": "smartphones"
}

# POST không có ID - ES tự generate ID
POST /products/_doc
{
  "name": "Galaxy S24",
  "price": 899.99
}
# Response: "_id": "abcdef123456..."

# PUT _create - chỉ tạo mới, fail nếu ID đã tồn tại
PUT /products/_create/1
{
  "name": "iPhone 15 Pro"
}
# 409 Conflict nếu ID=1 đã tồn tại

Read

# Get by ID
GET /products/_doc/1

# Response
{
  "_index": "products",
  "_id": "1",
  "_version": 1,
  "found": true,
  "_source": {
    "name": "iPhone 15 Pro",
    "price": 999.99
  }
}

# Multi-get
GET /products/_mget
{
  "ids": ["1", "2", "3"]
}

# Chỉ lấy fields cần thiết
GET /products/_doc/1?_source_includes=name,price
GET /products/_doc/1?_source=false  # Không lấy _source

Update

# Update một phần (partial update) - chỉ thay đổi fields chỉ định
POST /products/_update/1
{
  "doc": {
    "price": 899.99,
    "in_stock": true
  }
}

# Update với script
POST /products/_update/1
{
  "script": {
    "source": "ctx._source.price *= params.discount",
    "params": {
      "discount": 0.9
    }
  }
}

# Upsert - update nếu tồn tại, insert nếu không
POST /products/_update/99
{
  "doc": { "name": "New Product", "price": 199.99 },
  "doc_as_upsert": true
}

# Update by query
POST /products/_update_by_query
{
  "query": {
    "term": { "category": "smartphones" }
  },
  "script": {
    "source": "ctx._source.discount_eligible = true"
  }
}

Delete

# Delete by ID
DELETE /products/_doc/1

# Delete by query
POST /products/_delete_by_query
{
  "query": {
    "range": {
      "price": { "lt": 10 }   # Xóa products giá dưới $10
    }
  }
}

Bulk API

Thực hiện nhiều operations trong một request - hiệu suất cao hơn nhiều.

POST /_bulk
{ "index": { "_index": "products", "_id": "1" } }
{ "name": "iPhone 15 Pro", "price": 999.99, "category": "smartphones" }
{ "index": { "_index": "products", "_id": "2" } }
{ "name": "Galaxy S24", "price": 899.99, "category": "smartphones" }
{ "create": { "_index": "products", "_id": "3" } }
{ "name": "Pixel 8", "price": 699.99, "category": "smartphones" }
{ "update": { "_index": "products", "_id": "1" } }
{ "doc": { "price": 949.99 } }
{ "delete": { "_index": "products", "_id": "99" } }

Bulk response:

{
  "took": 30,
  "errors": false,
  "items": [
    { "index": { "_id": "1", "result": "created", "status": 201 } },
    { "index": { "_id": "2", "result": "created", "status": 201 } },
    { "create": { "_id": "3", "result": "created", "status": 201 } },
    { "update": { "_id": "1", "result": "updated", "status": 200 } },
    { "delete": { "_id": "99", "result": "not_found", "status": 404 } }
  ]
}

Best Practices cho Bulk Indexing

# 1. Batch size: 5-15MB per bulk request
# 2. Tắt refresh trong quá trình bulk import lớn
PUT /products/_settings
{ "refresh_interval": "-1" }

# 3. Thực hiện bulk indexing
# ... nhiều bulk requests ...

# 4. Bật lại refresh
PUT /products/_settings
{ "refresh_interval": "1s" }

# 5. Force merge sau khi xong (tùy chọn)
POST /products/_forcemerge?max_num_segments=1

Pipeline Processing

Xử lý documents trước khi index với Ingest Pipelines.

# Tạo pipeline
PUT /_ingest/pipeline/product-pipeline
{
  "description": "Process products before indexing",
  "processors": [
    {
      "lowercase": {
        "field": "category"
      }
    },
    {
      "trim": {
        "field": "name"
      }
    },
    {
      "set": {
        "field": "indexed_at",
        "value": "{{{_ingest.timestamp}}}"
      }
    },
    {
      "convert": {
        "field": "price",
        "type": "float"
      }
    },
    {
      "remove": {
        "field": ["internal_id", "raw_data"],
        "ignore_missing": true
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "error.message",
        "value": "{{ _ingest.on_failure_message }}"
      }
    }
  ]
}

# Sử dụng pipeline khi index
POST /products/_doc?pipeline=product-pipeline
{
  "name": "  iPhone 15 Pro  ",
  "price": "999.99",
  "category": "Smartphones"
}

# Đặt default pipeline cho index
PUT /products/_settings
{
  "default_pipeline": "product-pipeline"
}

Routing

# Mặc định: routing = document ID → hash → shard
# Custom routing: điều hướng related documents tới cùng shard

# Index với custom routing (ví dụ: theo user_id)
PUT /orders/_doc/order123?routing=user456
{
  "order_id": "order123",
  "user_id": "user456",
  "total": 150.00
}

# Search phải dùng cùng routing để chỉ query shard chứa data
GET /orders/_search?routing=user456
{
  "query": {
    "term": { "user_id": "user456" }
  }
}

# Lợi ích: Chỉ query 1 shard thay vì tất cả
# Chú ý: Nếu routing skewed → "hot shard" vấn đề