Aggregations với .NET
Giới thiệu
Aggregations là framework phân tích dữ liệu của Elasticsearch, tương đương GROUP BY + aggregation functions trong SQL.
Loại Aggregations:
- Metric: Tính toán trên tập documents (min, max, avg, sum, stats)
- Bucket: Nhóm documents (terms, range, date_histogram)
- Pipeline: Tính toán trên kết quả aggregation khác
Metric Aggregations
public async Task<ProductStats> GetProductStatsAsync(string? category = null)
{
var response = await _es.SearchAsync<Product>(s => s
.Size(0) // Không cần documents, chỉ cần aggs
.Query(q =>
{
if (category is not null)
q.Term(t => t.Field(f => f.Category).Value(category));
else
q.MatchAll();
return q;
})
.Aggregations(a => a
.Min("min_price", m => m.Field(f => f.Price))
.Max("max_price", m => m.Field(f => f.Price))
.Avg("avg_price", av => av.Field(f => f.Price))
.Sum("total_value", sum => sum
// price * stock_quantity per product
.Script(sc => sc
.Source("doc['price'].value * doc['stock_quantity'].value")
)
)
.Stats("price_stats", st => st.Field(f => f.Price))
.Cardinality("unique_brands", c => c.Field(f => f.Brand))
.ValueCount("in_stock_count", vc => vc.Field(f => f.InStock))
)
);
var aggs = response.Aggregations!;
return new ProductStats
{
MinPrice = aggs.GetMin("min_price")?.Value ?? 0,
MaxPrice = aggs.GetMax("max_price")?.Value ?? 0,
AvgPrice = aggs.GetAvg("avg_price")?.Value ?? 0,
TotalValue = aggs.GetSum("total_value")?.Value ?? 0,
UniqueBrands = (int)(aggs.GetCardinality("unique_brands")?.Value ?? 0),
};
}
public record ProductStats
{
public double MinPrice { get; init; }
public double MaxPrice { get; init; }
public double AvgPrice { get; init; }
public double TotalValue { get; init; }
public int UniqueBrands { get; init; }
}
Terms Aggregation - Nhóm theo giá trị
Tương đương GROUP BY category ORDER BY COUNT(*) DESC.
public async Task<List<CategoryFacet>> GetCategoryFacetsAsync()
{
var response = await _es.SearchAsync<Product>(s => s
.Size(0)
.Aggregations(a => a
.Terms("by_category", t => t
.Field(f => f.Category) // Phải là keyword field
.Size(20) // Lấy top 20 categories
.Order(new[] { TermsAggregationOrder.CountDescending })
.MinDocCount(1) // Chỉ trả về buckets có ít nhất 1 doc
)
)
);
var buckets = response.Aggregations!
.GetStringTerms("by_category")?
.Buckets ?? [];
return buckets.Select(b => new CategoryFacet
{
Category = b.Key.ToString()!,
Count = (int)b.DocCount,
}).ToList();
}
public record CategoryFacet(string Category, int Count);
Range Aggregation - Nhóm theo khoảng giá
public async Task<List<PriceBucket>> GetPriceFacetsAsync()
{
var response = await _es.SearchAsync<Product>(s => s
.Size(0)
.Aggregations(a => a
.Range("price_ranges", r => r
.Field(f => f.Price)
.Ranges(
new AggregationRange { To = 100, Key = "Under $100" },
new AggregationRange { From = 100, To = 500, Key = "$100 - $500" },
new AggregationRange { From = 500, To = 1000, Key = "$500 - $1000" },
new AggregationRange { From = 1000, Key = "Over $1000" }
)
)
)
);
var buckets = response.Aggregations!
.GetRange("price_ranges")?
.Buckets ?? [];
return buckets.Select(b => new PriceBucket
{
Label = b.Key,
Count = (int)b.DocCount,
}).ToList();
}
Date Histogram - Nhóm theo thời gian
public async Task<List<MonthlySales>> GetMonthlySalesAsync(int year)
{
var response = await _es.SearchAsync<Order>(s => s
.Size(0)
.Query(q => q
.Range(r => r.DateRange(dr => dr
.Field(f => f.OrderedAt)
.Gte(new DateTime(year, 1, 1))
.Lt(new DateTime(year + 1, 1, 1))
))
)
.Aggregations(a => a
.DateHistogram("monthly", dh => dh
.Field(f => f.OrderedAt)
.CalendarInterval(CalendarInterval.Month)
.Format("yyyy-MM")
.MinDocCount(0) // Hiển thị tháng không có data
.ExtendedBounds(new ExtendedBounds<FieldDateMath>(
new DateTime(year, 1, 1),
new DateTime(year, 12, 31)
))
)
)
);
var buckets = response.Aggregations!
.GetDateHistogram("monthly")?
.Buckets ?? [];
return buckets.Select(b => new MonthlySales
{
Month = b.KeyAsString ?? "",
Count = (int)b.DocCount,
}).ToList();
}
Nested Aggregations - Kết hợp nhiều tầng
public async Task<List<CategorySummary>> GetCategorySummaryAsync()
{
var response = await _es.SearchAsync<Product>(s => s
.Size(0)
.Aggregations(a => a
.Terms("by_category", t => t
.Field(f => f.Category)
.Size(10)
// Sub-aggregation trong mỗi bucket
.Aggregations(sa => sa
.Avg("avg_price", avg => avg.Field(f => f.Price))
.Max("max_price", max => max.Field(f => f.Price))
.Sum("total_stock", sum => sum.Field(f => f.StockQuantity))
.Terms("top_brands", tb => tb
.Field(f => f.Brand)
.Size(3) // Top 3 brands trong mỗi category
)
)
)
)
);
var categoryBuckets = response.Aggregations!
.GetStringTerms("by_category")?
.Buckets ?? [];
return categoryBuckets.Select(bucket =>
{
var subAggs = bucket.Aggregations;
var brandBuckets = subAggs.GetStringTerms("top_brands")?.Buckets ?? [];
return new CategorySummary
{
Category = bucket.Key.ToString()!,
Count = (int)bucket.DocCount,
AvgPrice = subAggs.GetAvg("avg_price")?.Value ?? 0,
MaxPrice = subAggs.GetMax("max_price")?.Value ?? 0,
TopBrands = brandBuckets
.Select(b => b.Key.ToString()!)
.ToList(),
};
}).ToList();
}
public record CategorySummary
{
public string Category { get; init; } = string.Empty;
public int Count { get; init; }
public double AvgPrice { get; init; }
public double MaxPrice { get; init; }
public List<string> TopBrands { get; init; } = [];
}
Kết hợp Search + Aggregations (Faceted Search)
Pattern phổ biến trong e-commerce: search + filter + facets.
public async Task<FacetedSearchResult> FacetedSearchAsync(FacetedSearchParams p)
{
var response = await _es.SearchAsync<Product>(s => s
// Query chính
.Query(q => q.Bool(b =>
{
if (!string.IsNullOrEmpty(p.Keyword))
b.Must(m => m.MultiMatch(mm => mm
.Fields(new[] { "name^3", "description" })
.Query(p.Keyword)
));
var filters = new List<Action<QueryDescriptor<Product>>>();
if (p.Categories?.Any() == true)
filters.Add(f => f.Terms(t => t
.Field(fld => fld.Category)
.Terms(new TermsQueryField(p.Categories.Select(FieldValue.String).ToArray()))
));
if (p.MaxPrice.HasValue)
filters.Add(f => f.Range(r => r.NumberRange(nr => nr
.Field(fld => fld.Price).Lte((double)p.MaxPrice.Value)
)));
if (filters.Any()) b.Filter(filters.ToArray());
return b;
}))
// Pagination
.From((p.Page - 1) * p.PageSize)
.Size(p.PageSize)
// Aggregations cho facets
.Aggregations(a => a
.Terms("categories", t => t.Field(f => f.Category).Size(20))
.Terms("brands", t => t.Field(f => f.Brand).Size(20))
.Range("price_ranges", r => r
.Field(f => f.Price)
.Ranges(
new AggregationRange { To = 100 },
new AggregationRange { From = 100, To = 500 },
new AggregationRange { From = 500, To = 1000 },
new AggregationRange { From = 1000 }
)
)
.Stats("price_stats", st => st.Field(f => f.Price))
)
);
var aggs = response.Aggregations!;
return new FacetedSearchResult
{
Products = response.Documents.ToList(),
Total = response.Total,
Facets = new FacetResults
{
Categories = aggs.GetStringTerms("categories")?
.Buckets.Select(b => new FacetItem(b.Key.ToString()!, (int)b.DocCount))
.ToList() ?? [],
Brands = aggs.GetStringTerms("brands")?
.Buckets.Select(b => new FacetItem(b.Key.ToString()!, (int)b.DocCount))
.ToList() ?? [],
PriceRanges = aggs.GetRange("price_ranges")?
.Buckets.Select(b => new FacetItem(b.Key, (int)b.DocCount))
.ToList() ?? [],
PriceStats = new PriceStatsResult
{
Min = aggs.GetStats("price_stats")?.Min ?? 0,
Max = aggs.GetStats("price_stats")?.Max ?? 0,
Avg = aggs.GetStats("price_stats")?.Avg ?? 0,
},
},
};
}