Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Aggregations với .NET

Giới thiệu

Aggregations là framework phân tích dữ liệu của Elasticsearch, tương đương GROUP BY + aggregation functions trong SQL.

Loại Aggregations:
- Metric:   Tính toán trên tập documents (min, max, avg, sum, stats)
- Bucket:   Nhóm documents (terms, range, date_histogram)
- Pipeline: Tính toán trên kết quả aggregation khác

Metric Aggregations

public async Task<ProductStats> GetProductStatsAsync(string? category = null)
{
    var response = await _es.SearchAsync<Product>(s => s
        .Size(0) // Không cần documents, chỉ cần aggs
        .Query(q =>
        {
            if (category is not null)
                q.Term(t => t.Field(f => f.Category).Value(category));
            else
                q.MatchAll();
            return q;
        })
        .Aggregations(a => a
            .Min("min_price", m => m.Field(f => f.Price))
            .Max("max_price", m => m.Field(f => f.Price))
            .Avg("avg_price", av => av.Field(f => f.Price))
            .Sum("total_value", sum => sum
                // price * stock_quantity per product
                .Script(sc => sc
                    .Source("doc['price'].value * doc['stock_quantity'].value")
                )
            )
            .Stats("price_stats", st => st.Field(f => f.Price))
            .Cardinality("unique_brands", c => c.Field(f => f.Brand))
            .ValueCount("in_stock_count", vc => vc.Field(f => f.InStock))
        )
    );

    var aggs = response.Aggregations!;

    return new ProductStats
    {
        MinPrice     = aggs.GetMin("min_price")?.Value ?? 0,
        MaxPrice     = aggs.GetMax("max_price")?.Value ?? 0,
        AvgPrice     = aggs.GetAvg("avg_price")?.Value ?? 0,
        TotalValue   = aggs.GetSum("total_value")?.Value ?? 0,
        UniqueBrands = (int)(aggs.GetCardinality("unique_brands")?.Value ?? 0),
    };
}

public record ProductStats
{
    public double MinPrice { get; init; }
    public double MaxPrice { get; init; }
    public double AvgPrice { get; init; }
    public double TotalValue { get; init; }
    public int UniqueBrands { get; init; }
}

Terms Aggregation - Nhóm theo giá trị

Tương đương GROUP BY category ORDER BY COUNT(*) DESC.

public async Task<List<CategoryFacet>> GetCategoryFacetsAsync()
{
    var response = await _es.SearchAsync<Product>(s => s
        .Size(0)
        .Aggregations(a => a
            .Terms("by_category", t => t
                .Field(f => f.Category) // Phải là keyword field
                .Size(20)               // Lấy top 20 categories
                .Order(new[] { TermsAggregationOrder.CountDescending })
                .MinDocCount(1)         // Chỉ trả về buckets có ít nhất 1 doc
            )
        )
    );

    var buckets = response.Aggregations!
        .GetStringTerms("by_category")?
        .Buckets ?? [];

    return buckets.Select(b => new CategoryFacet
    {
        Category = b.Key.ToString()!,
        Count    = (int)b.DocCount,
    }).ToList();
}

public record CategoryFacet(string Category, int Count);

Range Aggregation - Nhóm theo khoảng giá

public async Task<List<PriceBucket>> GetPriceFacetsAsync()
{
    var response = await _es.SearchAsync<Product>(s => s
        .Size(0)
        .Aggregations(a => a
            .Range("price_ranges", r => r
                .Field(f => f.Price)
                .Ranges(
                    new AggregationRange { To = 100, Key = "Under $100" },
                    new AggregationRange { From = 100, To = 500, Key = "$100 - $500" },
                    new AggregationRange { From = 500, To = 1000, Key = "$500 - $1000" },
                    new AggregationRange { From = 1000, Key = "Over $1000" }
                )
            )
        )
    );

    var buckets = response.Aggregations!
        .GetRange("price_ranges")?
        .Buckets ?? [];

    return buckets.Select(b => new PriceBucket
    {
        Label = b.Key,
        Count = (int)b.DocCount,
    }).ToList();
}

Date Histogram - Nhóm theo thời gian

public async Task<List<MonthlySales>> GetMonthlySalesAsync(int year)
{
    var response = await _es.SearchAsync<Order>(s => s
        .Size(0)
        .Query(q => q
            .Range(r => r.DateRange(dr => dr
                .Field(f => f.OrderedAt)
                .Gte(new DateTime(year, 1, 1))
                .Lt(new DateTime(year + 1, 1, 1))
            ))
        )
        .Aggregations(a => a
            .DateHistogram("monthly", dh => dh
                .Field(f => f.OrderedAt)
                .CalendarInterval(CalendarInterval.Month)
                .Format("yyyy-MM")
                .MinDocCount(0)  // Hiển thị tháng không có data
                .ExtendedBounds(new ExtendedBounds<FieldDateMath>(
                    new DateTime(year, 1, 1),
                    new DateTime(year, 12, 31)
                ))
            )
        )
    );

    var buckets = response.Aggregations!
        .GetDateHistogram("monthly")?
        .Buckets ?? [];

    return buckets.Select(b => new MonthlySales
    {
        Month = b.KeyAsString ?? "",
        Count = (int)b.DocCount,
    }).ToList();
}

Nested Aggregations - Kết hợp nhiều tầng

public async Task<List<CategorySummary>> GetCategorySummaryAsync()
{
    var response = await _es.SearchAsync<Product>(s => s
        .Size(0)
        .Aggregations(a => a
            .Terms("by_category", t => t
                .Field(f => f.Category)
                .Size(10)
                // Sub-aggregation trong mỗi bucket
                .Aggregations(sa => sa
                    .Avg("avg_price", avg => avg.Field(f => f.Price))
                    .Max("max_price", max => max.Field(f => f.Price))
                    .Sum("total_stock", sum => sum.Field(f => f.StockQuantity))
                    .Terms("top_brands", tb => tb
                        .Field(f => f.Brand)
                        .Size(3) // Top 3 brands trong mỗi category
                    )
                )
            )
        )
    );

    var categoryBuckets = response.Aggregations!
        .GetStringTerms("by_category")?
        .Buckets ?? [];

    return categoryBuckets.Select(bucket =>
    {
        var subAggs = bucket.Aggregations;
        var brandBuckets = subAggs.GetStringTerms("top_brands")?.Buckets ?? [];

        return new CategorySummary
        {
            Category  = bucket.Key.ToString()!,
            Count     = (int)bucket.DocCount,
            AvgPrice  = subAggs.GetAvg("avg_price")?.Value ?? 0,
            MaxPrice  = subAggs.GetMax("max_price")?.Value ?? 0,
            TopBrands = brandBuckets
                .Select(b => b.Key.ToString()!)
                .ToList(),
        };
    }).ToList();
}

public record CategorySummary
{
    public string Category { get; init; } = string.Empty;
    public int Count { get; init; }
    public double AvgPrice { get; init; }
    public double MaxPrice { get; init; }
    public List<string> TopBrands { get; init; } = [];
}

Pattern phổ biến trong e-commerce: search + filter + facets.

public async Task<FacetedSearchResult> FacetedSearchAsync(FacetedSearchParams p)
{
    var response = await _es.SearchAsync<Product>(s => s
        // Query chính
        .Query(q => q.Bool(b =>
        {
            if (!string.IsNullOrEmpty(p.Keyword))
                b.Must(m => m.MultiMatch(mm => mm
                    .Fields(new[] { "name^3", "description" })
                    .Query(p.Keyword)
                ));

            var filters = new List<Action<QueryDescriptor<Product>>>();
            if (p.Categories?.Any() == true)
                filters.Add(f => f.Terms(t => t
                    .Field(fld => fld.Category)
                    .Terms(new TermsQueryField(p.Categories.Select(FieldValue.String).ToArray()))
                ));
            if (p.MaxPrice.HasValue)
                filters.Add(f => f.Range(r => r.NumberRange(nr => nr
                    .Field(fld => fld.Price).Lte((double)p.MaxPrice.Value)
                )));
            if (filters.Any()) b.Filter(filters.ToArray());

            return b;
        }))
        // Pagination
        .From((p.Page - 1) * p.PageSize)
        .Size(p.PageSize)
        // Aggregations cho facets
        .Aggregations(a => a
            .Terms("categories", t => t.Field(f => f.Category).Size(20))
            .Terms("brands", t => t.Field(f => f.Brand).Size(20))
            .Range("price_ranges", r => r
                .Field(f => f.Price)
                .Ranges(
                    new AggregationRange { To = 100 },
                    new AggregationRange { From = 100, To = 500 },
                    new AggregationRange { From = 500, To = 1000 },
                    new AggregationRange { From = 1000 }
                )
            )
            .Stats("price_stats", st => st.Field(f => f.Price))
        )
    );

    var aggs = response.Aggregations!;

    return new FacetedSearchResult
    {
        Products = response.Documents.ToList(),
        Total    = response.Total,
        Facets = new FacetResults
        {
            Categories  = aggs.GetStringTerms("categories")?
                .Buckets.Select(b => new FacetItem(b.Key.ToString()!, (int)b.DocCount))
                .ToList() ?? [],
            Brands      = aggs.GetStringTerms("brands")?
                .Buckets.Select(b => new FacetItem(b.Key.ToString()!, (int)b.DocCount))
                .ToList() ?? [],
            PriceRanges = aggs.GetRange("price_ranges")?
                .Buckets.Select(b => new FacetItem(b.Key, (int)b.DocCount))
                .ToList() ?? [],
            PriceStats = new PriceStatsResult
            {
                Min = aggs.GetStats("price_stats")?.Min ?? 0,
                Max = aggs.GetStats("price_stats")?.Max ?? 0,
                Avg = aggs.GetStats("price_stats")?.Avg ?? 0,
            },
        },
    };
}