Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

SKCC Project | Industrial Monitoring System

Thời gian: 11/2024 – 04/2025
Vai trò: Backend Developer
Công ty: FPT Software


Tổng quan dự án

Bối cảnh

Nhà máy sản xuất cần hệ thống giám sát thiết bị công nghiệp theo thời gian thực để:

  • Theo dõi trạng thái máy móc (nhiệt độ, áp suất, tốc độ, rung động)
  • Phát hiện bất thường và cảnh báo sớm
  • Tự động hóa quy trình thông báo khi có sự cố
  • Tích hợp với các hệ thống ERP, MES hiện có

Yêu cầu chức năng

Functional Requirements

  1. Real-time Monitoring

    • Hiển thị dữ liệu sensor từ hàng ngàn thiết bị
    • Update frequency: 1-5 giây
    • Historical data visualization (charts, trends)
  2. Alert & Notification

    • Configurable thresholds cho từng sensor type
    • Multi-channel notifications (Email, SMS, Webhook)
    • Alert escalation rules
  3. Webhook Engine

    • User-defined rules với custom conditions
    • Trigger external APIs khi có events
    • Retry mechanism cho failed webhooks
  4. Reporting & Analytics

    • Daily/Weekly/Monthly reports
    • OEE (Overall Equipment Effectiveness) calculations
    • Downtime analysis
  5. Integration APIs

    • REST APIs cho third-party systems
    • Data export (CSV, Excel, PDF)

Non-Functional Requirements

  • Latency: End-to-end < 2 seconds cho real-time data
  • Throughput: Xử lý 10,000+ events/giây
  • Availability: 99.9% uptime (critical for factory operations)
  • Scalability: Support thêm 50% devices mà không cần redesign
  • Data Retention: Lưu 2 years raw data, 5 years aggregated data

Kiến trúc & Công nghệ

Technology Stack

┌────────────────────────────────────────────────────────────────────┐
│                         Client Applications                         │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────────────┐  │
│  │  WPF Client  │  │  Web Dashboard│  │  Mobile App (Future)   │  │
│  │  (Operators) │  │  (Managers)   │  │                        │  │
│  └──────────────┘  └──────────────┘  └──────────────────────────┘  │
└────────────────────────────────────────────────────────────────────┘
         ↕ SignalR (Real-time)            ↕ REST API
┌────────────────────────────────────────────────────────────────────┐
│                      Backend Services (.NET Core)                   │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────────────┐  │
│  │  Data        │  │  Alert       │  │  Webhook                 │  │
│  │  Ingestion   │  │  Service     │  │  Engine                  │  │
│  │  Service     │  │              │  │                          │  │
│  └──────────────┘  └──────────────┘  └──────────────────────────┘  │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────────────┐  │
│  │  Reporting   │  │  Integration │  │  API Gateway             │  │
│  │  Service     │  │  Service     │  │  (Kong)                  │  │
│  └──────────────┘  └──────────────┘  └──────────────────────────┘  │
└────────────────────────────────────────────────────────────────────┘
         ↕                                       ↕
┌─────────────────────┐              ┌──────────────────────────────┐
│   Data Stores       │              │   Message Infrastructure     │
│  ┌──────────────┐   │              │  ┌────────────────────────┐  │
│  │  ElasticSearch│   │              │  │  Azure Service Bus     │  │
│  │  (Time-series)│   │              │  │  (Topics & Queues)     │  │
│  └──────────────┘   │              │  └────────────────────────┘  │
│  ┌──────────────┐   │              │                              │
│  │  SQL Server  │   │              │                              │
│  │  (Metadata)  │   │              │                              │
│  └──────────────┘   │              │                              │
│  ┌──────────────┐   │              │                              │
│  │  Redis       │   │              │                              │
│  │  (Cache)     │   │              │                              │
│  └──────────────┘   │              │                              │
└─────────────────────┘              └──────────────────────────────┘

Components chi tiết

1. Data Ingestion Pipeline

public interface ISensorDataService
{
    Task ProcessSensorDataAsync(SensorDataDto data);
    Task<IEnumerable<SensorDataDto>> GetHistoricalDataAsync(
        int deviceId, 
        DateTime from, 
        DateTime to);
    Task<SensorStatusDto> GetCurrentStatusAsync(int deviceId);
}

public class SensorDataIngestionService : ISensorDataService
{
    private readonly IElasticClient _elasticClient;
    private readonly ITopicClient _serviceBusTopic;
    private readonly ISensorCache _cache;
    
    public async Task ProcessSensorDataAsync(SensorDataDto data)
    {
        // 1. Validate data
        if (!IsValidSensorData(data))
        {
            throw new InvalidSensorDataException($"Invalid data from device {data.DeviceId}");
        }
        
        // 2. Store in ElasticSearch (time-series index)
        var indexName = $"sensor-data-{data.Timestamp:yyyy-MM}";
        await _elasticClient.IndexAsync(data, idx => idx.Index(indexName));
        
        // 3. Update current status cache
        await _cache.UpdateStatusAsync(data.DeviceId, new SensorStatus
        {
            LastValue = data.Value,
            LastUpdated = data.Timestamp,
            Status = DetermineStatus(data)
        });
        
        // 4. Check thresholds & publish events
        var alertEvent = await CheckThresholdsAsync(data);
        if (alertEvent != null)
        {
            await _serviceBusTopic.SendMessageAsync(new SensorAlertEvent
            {
                DeviceId = data.DeviceId,
                SensorType = data.SensorType,
                Value = data.Value,
                Threshold = alertEvent.Threshold,
                Severity = alertEvent.Severity,
                Timestamp = data.Timestamp
            });
        }
        
        // 5. Publish raw data event for other consumers
        await _serviceBusTopic.SendMessageAsync(new SensorDataEvent
        {
            DeviceId = data.DeviceId,
            Data = data
        });
    }
    
    public async Task<IEnumerable<SensorDataDto>> GetHistoricalDataAsync(
        int deviceId, 
        DateTime from, 
        DateTime to)
    {
        // Query ElasticSearch with date range
        var searchResponse = await _elasticClient.SearchAsync<SensorDataDto>(s => s
            .Index($"sensor-data-{from:yyyy-MM}")
            .Query(q => q
                .Bool(b => b
                    .Must(m => m.Term(t => t.DeviceId, deviceId))
                    .Filter(f => f
                        .DateRange(d => d
                            .Field(fld => fld.Timestamp)
                            .GreaterThanOrEquals(from)
                            .LessThanOrEquals(to)
                        )
                    )
                )
            )
            .Sort(sort => sort
                .Descending(d => d.Timestamp)
            )
            .Size(10000)
        );
        
        return searchResponse.Documents;
    }
    
    private SensorStatus DetermineStatus(SensorDataDto data)
    {
        // Business logic to determine if sensor is Normal, Warning, or Critical
        var thresholds = _thresholdCache.Get(data.DeviceId, data.SensorType);
        if (data.Value >= thresholds.Critical) return SensorStatus.Critical;
        if (data.Value >= thresholds.Warning) return SensorStatus.Warning;
        return SensorStatus.Normal;
    }
}

public class SensorDataDto
{
    public int DeviceId { get; set; }
    public string SensorType { get; set; } // Temperature, Pressure, Vibration
    public double Value { get; set; }
    public string Unit { get; set; } // Celsius, PSI, mm/s
    public DateTime Timestamp { get; set; }
    public string Quality { get; set; } // Good, Bad, Uncertain
}

2. Alert Service với Rule Engine

public interface IAlertService
{
    Task<AlertRuleDto> CreateRuleAsync(AlertRuleCreateRequest request);
    Task UpdateRuleAsync(int ruleId, AlertRuleUpdateRequest request);
    Task DeleteRuleAsync(int ruleId);
    Task<IEnumerable<AlertRuleDto>> GetRulesByDeviceAsync(int deviceId);
    Task ProcessAlertEventAsync(SensorAlertEvent alertEvent);
}

public class AlertRuleEngine : IAlertService
{
    private readonly RulesEngine _rulesEngine;
    private readonly IAlertRepository _alertRepository;
    private readonly INotificationService _notificationService;
    private readonly IWebhookDispatcher _webhookDispatcher;
    
    public async Task<AlertRuleDto> CreateRuleAsync(AlertRuleCreateRequest request)
    {
        // Validate rule expression
        var workflow = new Workflow
        {
            Name = $"Rule_{request.DeviceId}",
            Rules = new List<Rule>
            {
                new Rule
                {
                    Name = "ThresholdCheck",
                    Expression = request.RuleExpression, // e.g., "value > 100"
                    RuleExpressionType = RuleExpressionType.LambdaExpression
                }
            }
        };
        
        var validationResult = _rulesEngine.ValidateWorkflow(workflow);
        if (validationResult.Any(error => error.ErrorType == ErrorType.Compilation))
        {
            throw new InvalidRuleExpressionException(validationResult.First().Message);
        }
        
        // Save rule
        var rule = new AlertRule
        {
            DeviceId = request.DeviceId,
            SensorType = request.SensorType,
            RuleExpression = request.RuleExpression,
            Severity = request.Severity,
            NotificationChannels = request.NotificationChannels,
            WebhookUrl = request.WebhookUrl,
            IsActive = true,
            CreatedAt = DateTime.UtcNow
        };
        
        await _alertRepository.AddAsync(rule);
        return MapToDto(rule);
    }
    
    public async Task ProcessAlertEventAsync(SensorAlertEvent alertEvent)
    {
        // Get applicable rules
        var rules = await _alertRepository.GetActiveRulesAsync(
            alertEvent.DeviceId, 
            alertEvent.SensorType);
        
        foreach (var rule in rules)
        {
            // Evaluate rule
            var input = new { value = alertEvent.Value, threshold = alertEvent.Threshold };
            var result = await _rulesEngine.ExecuteActionAsync(rule.RuleExpression, input);
            
            if (result.IsSuccess)
            {
                // Create alert record
                var alert = new Alert
                {
                    RuleId = rule.Id,
                    DeviceId = alertEvent.DeviceId,
                    SensorType = alertEvent.SensorType,
                    Value = alertEvent.Value,
                    Severity = rule.Severity,
                    TriggeredAt = DateTime.UtcNow
                };
                
                await _alertRepository.CreateAlertAsync(alert);
                
                // Send notifications (parallel)
                var tasks = new List<Task>();
                
                if (rule.NotificationChannels.Contains("Email"))
                {
                    tasks.Add(_notificationService.SendEmailAlertAsync(alert));
                }
                
                if (rule.NotificationChannels.Contains("SMS"))
                {
                    tasks.Add(_notificationService.SendSmsAlertAsync(alert));
                }
                
                if (!string.IsNullOrEmpty(rule.WebhookUrl))
                {
                    tasks.Add(_webhookDispatcher.DispatchAsync(rule.WebhookUrl, alert));
                }
                
                await Task.WhenAll(tasks);
            }
        }
    }
}

public class AlertRuleCreateRequest
{
    public int DeviceId { get; set; }
    public string SensorType { get; set; }
    public string RuleExpression { get; set; } // e.g., "value > 100 && value < 200"
    public AlertSeverity Severity { get; set; }
    public List<string> NotificationChannels { get; set; }
    public string WebhookUrl { get; set; }
}

public enum AlertSeverity
{
    Low,
    Medium,
    High,
    Critical
}

3. Webhook Engine với Retry & Circuit Breaker

public interface IWebhookDispatcher
{
    Task DispatchAsync(string url, object payload);
    Task<WebhookLogDto> GetWebhookLogAsync(string logId);
}

public class ResilientWebhookDispatcher : IWebhookDispatcher
{
    private readonly HttpClient _httpClient;
    private readonly IWebhookLogRepository _logRepository;
    private readonly IBackgroundJobClient _jobClient;
    private readonly ILogger<ResilientWebhookDispatcher> _logger;
    
    // Circuit breaker policy
    private readonly AsyncCircuitBreakerPolicy _circuitBreaker;
    
    // Retry policy
    private readonly AsyncRetryPolicy _retryPolicy;
    
    public ResilientWebhookDispatcher(
        HttpClient httpClient,
        IWebhookLogRepository logRepository,
        IBackgroundJobClient jobClient,
        ILogger<ResilientWebhookDispatcher> logger)
    {
        _httpClient = httpClient;
        _logRepository = logRepository;
        _jobClient = jobClient;
        _logger = logger;
        
        // Configure retry policy
        _retryPolicy = Policy
            .Handle<HttpRequestException>()
            .OrResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
            .WaitAndRetryAsync(
                retryCount: 3,
                sleepDurationProvider: retry => TimeSpan.FromSeconds(Math.Pow(2, retry)),
                onRetry: (outcome, timespan, retryNumber, context) =>
                {
                    _logger.LogWarning(
                        "Webhook failed. Retry {RetryNumber} after {Timespan}. Status: {Status}",
                        retryNumber,
                        timespan,
                        outcome.Result?.StatusCode);
                }
            );
        
        // Configure circuit breaker
        _circuitBreaker = Policy
            .Handle<HttpRequestException>()
            .OrResult<HttpResponseMessage>(r => r.StatusCode == HttpStatusCode.ServiceUnavailable)
            .CircuitBreakerAsync(
                exceptionsAllowedBeforeBreaking: 5,
                durationOfBreak: TimeSpan.FromMinutes(5),
                onBreak: (outcome, duration) =>
                {
                    _logger.LogError("Circuit breaker opened for {Duration}. Reason: {Reason}",
                        duration, outcome.Exception?.Message);
                },
                onReset: () =>
                {
                    _logger.LogInformation("Circuit breaker reset to closed state.");
                }
            );
    }
    
    public async Task DispatchAsync(string url, object payload)
    {
        var logId = Guid.NewGuid().ToString();
        var log = new WebhookLog
        {
            Id = logId,
            Url = url,
            Payload = JsonSerializer.Serialize(payload),
            Status = WebhookStatus.Pending,
            CreatedAt = DateTime.UtcNow
        };
        
        await _logRepository.CreateAsync(log);
        
        try
        {
            // Execute with retry and circuit breaker
            var response = await _circuitBreaker.ExecuteAsync(
                async () => await _retryPolicy.ExecuteAsync(
                    async () =>
                    {
                        var content = new StringContent(
                            JsonSerializer.Serialize(payload),
                            Encoding.UTF8,
                            "application/json"
                        );
                        
                        return await _httpClient.PostAsync(url, content);
                    }
                )
            );
            
            // Update log
            log.Status = response.IsSuccessStatusCode 
                ? WebhookStatus.Success 
                : WebhookStatus.Failed;
            log.ResponseCode = (int?)response.StatusCode;
            log.ResponseBody = await response.Content.ReadAsStringAsync();
            log.CompletedAt = DateTime.UtcNow;
            
            await _logRepository.UpdateAsync(log);
            
            if (!response.IsSuccessStatusCode)
            {
                throw new HttpRequestException($"Webhook failed with status {response.StatusCode}");
            }
        }
        catch (BrokenCircuitException)
        {
            _logger.LogError("Circuit breaker open for webhook {Url}. Queuing for later.", url);
            
            // Queue for later retry when circuit closes
            await _jobClient.Schedule<IRetryWebhookJob>(
                job => job.RetryAsync(logId),
                TimeSpan.FromMinutes(10)
            );
            
            log.Status = WebhookStatus.CircuitOpen;
            await _logRepository.UpdateAsync(log);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Webhook dispatch failed for {Url}", url);
            
            log.Status = WebhookStatus.Failed;
            log.ErrorMessage = ex.Message;
            await _logRepository.UpdateAsync(log);
            
            // Schedule retry if not circuit breaker
            if (ex is not BrokenCircuitException)
            {
                await _jobClient.Schedule<IRetryWebhookJob>(
                    job => job.RetryAsync(logId),
                    TimeSpan.FromMinutes(5)
                );
            }
        }
    }
}

public interface IRetryWebhookJob
{
    Task RetryAsync(string logId);
}

public class RetryWebhookJob : IRetryWebhookJob
{
    private readonly IWebhookDispatcher _dispatcher;
    private readonly IWebhookLogRepository _logRepository;
    
    public async Task RetryAsync(string logId)
    {
        var log = await _logRepository.GetByIdAsync(logId);
        if (log == null) return;
        
        var payload = JsonSerializer.Deserialize<object>(log.Payload);
        await _dispatcher.DispatchAsync(log.Url, payload);
    }
}

4. WPF Client với Real-time Updates

// ViewModel với SignalR connection
public class MonitoringViewModel : INotifyPropertyChanged, IDisposable
{
    private readonly HubConnection _hubConnection;
    private readonly ISensorDataService _sensorService;
    private ObservableCollection<SensorViewModel> _sensors;
    
    public MonitoringViewModel(
        ISensorDataService sensorService,
        ILogger<MonitoringViewModel> logger)
    {
        _sensorService = sensorService;
        
        // Initialize SignalR connection
        _hubConnection = new HubConnectionBuilder()
            .WithUrl("https://api.skcc.com/sensorhub")
            .WithAutomaticReconnect(new RetryPolicy())
            .Build();
        
        // Register event handlers
        _hubConnection.On<SensorDataDto>("SensorDataReceived", data =>
        {
            UpdateSensorData(data);
        });
        
        _hubConnection.On<AlertDto>("AlertReceived", alert =>
        {
            ShowAlert(alert);
        });
        
        // Start connection
        _ = StartConnectionAsync();
    }
    
    private async Task StartConnectionAsync()
    {
        try
        {
            await _hubConnection.StartAsync();
            
            // Load initial data
            var sensors = await _sensorService.GetAllSensorsAsync();
            Sensors = new ObservableCollection<SensorViewModel>(
                sensors.Select(s => new SensorViewModel(s))
            );
        }
        catch (Exception ex)
        {
            // Handle connection error
            Console.WriteLine($"Connection failed: {ex.Message}");
        }
    }
    
    private void UpdateSensorData(SensorDataDto data)
    {
        // Update UI on dispatcher thread
        Application.Current.Dispatcher.Invoke(() =>
        {
            var sensor = Sensors.FirstOrDefault(s => s.DeviceId == data.DeviceId);
            if (sensor != null)
            {
                sensor.UpdateValue(data.Value, data.Timestamp);
            }
        });
    }
    
    private void ShowAlert(AlertDto alert)
    {
        Application.Current.Dispatcher.Invoke(() =>
        {
            // Show alert notification
            MessageBox.Show(
                $"Alert: {alert.SensorType} on Device {alert.DeviceId}\n" +
                $"Value: {alert.Value} {alert.Unit}\n" +
                $"Severity: {alert.Severity}",
                "Sensor Alert",
                MessageBoxButton.OK,
                alert.Severity == AlertSeverity.Critical 
                    ? MessageBoxImage.Error 
                    : MessageBoxImage.Warning
            );
        });
    }
    
    public void Dispose()
    {
        _hubConnection?.DisposeAsync().GetAwaiter().GetResult();
    }
    
    // INotifyPropertyChanged implementation
    public event PropertyChangedEventHandler PropertyChanged;
}

// Custom retry policy for SignalR
public class RetryPolicy : IRetryPolicy
{
    public TimeSpan? NextRetryDelay(DefaultRetryContext context)
    {
        // Exponential backoff: 0s, 2s, 10s, 30s, max 2 minutes
        if (context.PreviousRetryCount == 0) return TimeSpan.Zero;
        if (context.PreviousRetryCount == 1) return TimeSpan.FromSeconds(2);
        if (context.PreviousRetryCount == 2) return TimeSpan.FromSeconds(10);
        if (context.PreviousRetryCount == 3) return TimeSpan.FromSeconds(30);
        return TimeSpan.FromMinutes(2);
    }
}

ElasticSearch Optimization

Index Design

// Index template for time-series data
public class ElasticSearchIndexSetup
{
    public async Task SetupIndexTemplateAsync(IElasticClient client)
    {
        var templateName = "sensor-data-template";
        
        var templateResponse = await client.Indices.PutTemplateAsync(templateName, pt => pt
            .IndexPatterns("sensor-data-*")
            .Settings(s => s
                .NumberOfReplicas(1)
                .NumberOfShards(3)
                .RefreshInterval("5s") // Near real-time
            )
            .Map<m>(m
                .AutoMap<SensorDataDto>()
                .Properties(p => p
                    .Keyword(k => k
                        .Name(n => n.DeviceId)
                    )
                    .Keyword(k => k
                        .Name(n => n.SensorType)
                    )
                    .Date(d => d
                        .Name(n => n.Timestamp)
                    )
                    .Double(db => db
                        .Name(n => n.Value)
                    )
                )
            )
        );
        
        // Setup ILM (Index Lifecycle Management)
        var policyName = "sensor-data-policy";
        await client.XPack.IndexLifecycleManagement.PutLifecycleAsync(policyName, p => p
            .Policy(new LifecyclePolicy
            {
                Phases = new Phases
                {
                    Hot = new HotPhase
                    {
                        Actions = new Actions
                        {
                            RollOver = new RollOverAction
                            {
                                MaxAge = "30d",
                                MaxSize = "50gb"
                            }
                        }
                    },
                    Warm = new WarmPhase
                    {
                        MinAge = "30d",
                        Actions = new Actions
                        {
                            Shrink = new ShrinkAction
                            {
                                NumberOfShards = 1
                            },
                            SetPriority = new SetPriorityAction
                            {
                                Priority = 50
                            }
                        }
                    },
                    Cold = new ColdPhase
                    {
                        MinAge = "90d",
                        Actions = new Actions
                        {
                            SetPriority = new SetPriorityAction
                            {
                                Priority = 0
                            }
                        }
                    },
                    Delete = new DeletePhase
                    {
                        MinAge = "730d" // 2 years
                    }
                }
            })
        );
    }
}

Query Optimization

public class OptimizedSensorQueries
{
    private readonly IElasticClient _client;
    
    // Use search_after for deep pagination
    public async Task<IEnumerable<SensorDataDto>> GetHistoricalDataWithPaginationAsync(
        int deviceId,
        DateTime from,
        DateTime to,
        int pageSize = 1000,
        string? searchAfter = null)
    {
        ISearchResponse<SensorDataDto> searchResponse;
        
        if (searchAfter == null)
        {
            // First page
            searchResponse = await _client.SearchAsync<SensorDataDto>(s => s
                .Index($"sensor-data-{from:yyyy-MM}")
                .Query(q => q
                    .Bool(b => b
                        .Must(m => m.Term(t => t.DeviceId, deviceId))
                        .Filter(f => f
                            .DateRange(d => d
                                .Field(fld => fld.Timestamp)
                                .GreaterThanOrEquals(from)
                                .LessThanOrEquals(to)
                            )
                        )
                    )
                )
                .Sort(sort => sort
                    .Descending(d => d.Timestamp)
                )
                .Size(pageSize)
            );
        }
        else
        {
            // Subsequent pages with search_after
            var searchAfterValues = JsonSerializer.Deserialize<string[]>(searchAfter);
            searchResponse = await _client.SearchAsync<SensorDataDto>(s => s
                .Index($"sensor-data-{from:yyyy-MM}")
                .Query(q => q
                    .Bool(b => b
                        .Must(m => m.Term(t => t.DeviceId, deviceId))
                        .Filter(f => f
                            .DateRange(d => d
                                .Field(fld => fld.Timestamp)
                                .GreaterThanOrEquals(from)
                                .LessThanOrEquals(to)
                            )
                        )
                    )
                )
                .Sort(sort => sort
                    .Descending(d => d.Timestamp)
                )
                .SearchAfter(searchAfterValues)
                .Size(pageSize)
            );
        }
        
        return searchResponse.Documents;
    }
    
    // Aggregation for dashboard
    public async Task<SensorAggregationsDto> GetAggregationsAsync(
        int deviceId,
        DateTime from,
        DateTime to)
    {
        var response = await _client.SearchAsync<SensorDataDto>(s => s
            .Index($"sensor-data-{from:yyyy-MM}")
            .Query(q => q
                .Bool(b => b
                    .Must(m => m.Term(t => t.DeviceId, deviceId))
                    .Filter(f => f
                        .DateRange(d => d
                            .Field(fld => fld.Timestamp)
                            .GreaterThanOrEquals(from)
                            .LessThanOrEquals(to)
                        )
                    )
                )
            )
            .Size(0) // No hits needed
            .Aggregations(a => a
                .Average("avg_value", avg => avg.Field(f => f.Value))
                .Max("max_value", max => max.Field(f => f.Value))
                .Min("min_value", min => min.Field(f => f.Value))
                .StdDeviation("std_dev", std => std.Field(f => f.Value))
                .DateHistogram("timeline", dh => dh
                    .Field(f => f.Timestamp)
                    .CalendarInterval(CalendarInterval.Hour)
                    .Aggregations(aa => aa
                        .Average("hourly_avg", a => a.Field(f => f.Value))
                    )
                )
            )
        );
        
        return new SensorAggregationsDto
        {
            AverageValue = response.Aggregations.Average("avg_value")?.Value,
            MaxValue = response.Aggregations.Max("max_value")?.Value,
            MinValue = response.Aggregations.Min("min_value")?.Value,
            StdDeviation = response.Aggregations.StdDeviation("std_dev")?.StdDeviation,
            HourlyAverages = response.Aggregations
                .DateHistogram("timeline")
                .Buckets
                .Select(b => new HourlyAverage
                {
                    Timestamp = b.Key,
                    AverageValue = b.Average("hourly_avg")?.Value
                })
                .ToList()
        };
    }
}

Thách thức & Giải pháp

Challenge 1: Xử lý 10,000+ events/giây

Vấn đề: Khi nhà máy có hàng ngàn sensors gửi data mỗi giây, hệ thống bị quá tải.

Giải pháp:

// Batch processing với Channel
public class BatchedSensorDataProcessor
{
    private readonly Channel<SensorDataDto> _channel;
    private readonly List<Task> _workers;
    
    public BatchedSensorDataProcessor(int workerCount = 10, int batchSize = 100)
    {
        _channel = Channel.CreateBounded<SensorDataDto>(
            new BoundedChannelOptions(10000)
            {
                FullMode = BoundedChannelFullMode.Wait
            }
        );
        
        _workers = new List<Task>();
        for (int i = 0; i < workerCount; i++)
        {
            _workers.Add(ProcessBatchAsync(batchSize));
        }
    }
    
    public async Task QueueDataAsync(SensorDataDto data)
    {
        await _channel.Writer.WriteAsync(data);
    }
    
    private async Task ProcessBatchAsync(int batchSize)
    {
        var batch = new List<SensorDataDto>(batchSize);
        
        while (await _channel.Reader.WaitToReadAsync())
        {
            batch.Clear();
            
            // Collect batch
            while (batch.Count < batchSize && 
                   _channel.Reader.TryRead(out var data))
            {
                batch.Add(data);
            }
            
            if (batch.Count > 0)
            {
                // Bulk index to ElasticSearch
                await BulkIndexToElasticAsync(batch);
            }
        }
    }
    
    private async Task BulkIndexToElasticAsync(List<SensorDataDto> batch)
    {
        var bulkDescriptor = new BulkDescriptor();
        
        foreach (var data in batch)
        {
            var indexName = $"sensor-data-{data.Timestamp:yyyy-MM}";
            bulkDescriptor.Index<SensorDataDto>(idx => idx
                .Index(indexName)
                .Id($"{data.DeviceId}_{data.SensorType}_{data.Timestamp:O}")
                .Document(data)
            );
        }
        
        await _elasticClient.BulkAsync(bulkDescriptor);
    }
}

Challenge 2: Real-time visualization với hàng ngàn data points

Vấn đề: WPF client bị chậm khi render chart với quá nhiều points.

Giải pháp:

// Data downsampling cho visualization
public class TimeSeriesDownsampler
{
    // LTTB (Largest-Triangle-Three-Buckets) algorithm
    public List<SensorDataPoint> Downsample(
        List<SensorDataPoint> data, 
        int threshold)
    {
        if (data.Count <= threshold) return data;
        
        var downsampled = new List<SensorDataPoint>();
        int bucketSize = data.Count / threshold;
        
        int lastSelectedIndex = 0;
        downsampled.Add(data[0]); // Always include first point
        
        for (int i = 0; i < threshold - 2; i++)
        {
            int bucketStart = (i + 1) * bucketSize;
            int bucketEnd = (i + 2) * bucketSize;
            
            // Find point with largest triangle area
            var selectedPoint = FindLargestTrianglePoint(
                data, 
                data[lastSelectedIndex], 
                bucketStart, 
                bucketEnd
            );
            
            downsampled.Add(selectedPoint.point);
            lastSelectedIndex = selectedPoint.index;
        }
        
        downsampled.Add(data[^1]); // Always include last point
        return downsampled;
    }
    
    private (SensorDataPoint point, int index) FindLargestTrianglePoint(
        List<SensorDataPoint> data,
        SensorDataPoint lastPoint,
        int bucketStart,
        int bucketEnd)
    {
        double maxArea = -1;
        var selectedPoint = data[bucketStart];
        var selectedIndex = bucketStart;
        
        for (int i = bucketStart; i < bucketEnd && i < data.Count; i++)
        {
            // Calculate triangle area
            double area = Math.Abs(
                (lastPoint.Value * (data[i].Timestamp - data[bucketEnd].Timestamp) +
                 data[i].Value * (data[bucketEnd].Timestamp - lastPoint.Timestamp) +
                 data[bucketEnd].Value * (lastPoint.Timestamp - data[i].Timestamp)) / 2.0
            );
            
            if (area > maxArea)
            {
                maxArea = area;
                selectedPoint = data[i];
                selectedIndex = i;
            }
        }
        
        return (selectedPoint, selectedIndex);
    }
}

// Usage in WPF ViewModel
private void UpdateChartWithDownsampling(List<SensorDataPoint> rawData)
{
    var downsampler = new TimeSeriesDownsampler();
    var downsampled = downsampler.Downsample(rawData, threshold: 500);
    
    // Render only 500 points instead of 10,000
    ChartSeries.Points = new ObservableCollection<DataPoint>(
        downsampled.Select(p => new DataPoint(p.Timestamp, p.Value))
    );
}

Challenge 3: Webhook reliability với external systems

Vấn đề: External APIs có thể down, rate limit, hoặc trả về lỗi.

Giải pháp: (Xem Webhook Engine section ở trên với Circuit Breaker + Retry + Dead Letter Queue)


Kết quả & Impact

Metrics

MetricBeforeAfterImprovement
Event processing latency5.2s0.8s85% faster
Chart render time (10k points)3.5s0.3s91% faster
Webhook success rate78%99.5%27% improvement
Data retention30 days2 years24x longer
Alert delivery time15s2s87% faster

Business Impact

  • Giảm 60% downtime nhờ early warning system
  • Tiết kiệm 200 giờ/năm manual data collection
  • Tăng 40% OEE nhờ predictive maintenance

Bài học kinh nghiệm

Technical Learnings

  1. Time-series data cần special handling: Index rollover, downsampling, aggregation
  2. Circuit breaker là must-have: Khi tích hợp với external systems
  3. Batch processing > Real-time cho high volume: Trade-off giữa latency và throughput
  4. Monitoring & Alerting: Cần instrument mọi thứ từ day 1

Soft Skills

  1. Domain knowledge quan trọng: Hiểu factory operations để design đúng
  2. Stakeholder management: Operators vs Managers có different needs
  3. Documentation: Critical cho handover và maintenance

Câu hỏi phỏng vấn

Q1: Tại sao chọn ElasticSearch thay vì SQL Server cho sensor data?

A:

  • Write throughput: ElasticSearch handle tốt hơn cho write-heavy time-series
  • Aggregation performance: ES aggregations nhanh hơn SQL GROUP BY cho large datasets
  • Index lifecycle: ILM built-in, dễ dàng rollover và delete old data
  • Trade-off: ES không support transactions như SQL Server → dùng ES cho data, SQL cho metadata

Q2: Bạn xử lý data loss như thế nào khi system crash?

A:

// Write-ahead logging
public async Task ProcessSensorDataAsync(SensorDataDto data)
{
    // 1. Write to WAL first (atomic)
    await _walRepository.AppendAsync(new WalEntry
    {
        Data = data,
        Timestamp = DateTime.UtcNow
    });
    
    try
    {
        // 2. Process data
        await IndexToElasticAsync(data);
        
        // 3. Mark WAL entry as processed
        await _walRepository.MarkProcessedAsync(entry.Id);
    }
    catch (Exception ex)
    {
        // 4. On recovery, replay unprocessed WAL entries
        _logger.LogError(ex, "Processing failed, will replay from WAL");
        throw;
    }
}

// Recovery process
public async Task RecoverAsync()
{
    var unprocessedEntries = await _walRepository.GetUnprocessedAsync();
    
    foreach (var entry in unprocessedEntries)
    {
        await IndexToElasticAsync(entry.Data);
        await _walRepository.MarkProcessedAsync(entry.Id);
    }
}

Q3: Làm sao để test real-time system với hàng ngàn concurrent connections?

A:

// Load test với WebApplicationFactory
[Fact]
public async Task LoadTest_SensorDataIngestion_10kEventsPerSecond()
{
    using var factory = new CustomWebApplicationFactory();
    using var client = factory.CreateClient();
    
    var events = GenerateSensorEvents(10000);
    var tasks = events.Select(e => 
        client.PostAsJsonAsync("/api/sensor-data", e)
    );
    
    var stopwatch = Stopwatch.StartNew();
    await Task.WhenAll(tasks);
    stopwatch.Stop();
    
    var throughput = 10000 / stopwatch.Elapsed.TotalSeconds;
    
    Assert.True(throughput >= 5000, $"Expected >= 5000 events/s, got {throughput}");
}

// Integration test với TestContainers
[Fact]
public async Task IntegrationTest_ElasticSearchConnection()
{
    await using var container = new ElasticSearchBuilder()
        .Build();
    
    await container.StartAsync();
    
    var client = container.CreateClient();
    
    // Test indexing
    var doc = new SensorDataDto { /* ... */ };
    var response = await client.IndexAsync(doc);
    
    Assert.True(response.IsValid);
}

Q4: Bạn monitor system performance như thế nào?

A:

// Application Insights custom metrics
public class SensorDataIngestionService
{
    private readonly TelemetryClient _telemetry;
    private readonly Histogram _processingTime;
    private readonly Counter _eventsProcessed;
    
    public async Task ProcessSensorDataAsync(SensorDataDto data)
    {
        using var activity = _telemetry.StartOperation("ProcessSensorData");
        
        var stopwatch = Stopwatch.StartNew();
        
        try
        {
            await IndexToElasticAsync(data);
            
            _eventsProcessed.Add(1);
            _processingTime.Record(stopwatch.ElapsedMilliseconds);
            
            activity.SetTag("success", true);
        }
        catch (Exception ex)
        {
            _telemetry.TrackException(ex);
            activity.SetTag("success", false);
            throw;
        }
    }
}

// Dashboard với Grafana
// Query Prometheus:
// - Rate: rate(events_processed_total[1m])
// - Latency: histogram_quantile(0.95, processing_time_seconds_bucket)
// - Errors: rate(process_sensor_data_errors_total[1m])

Q5: Nếu phải scale lên 100,000 devices, bạn sẽ thay đổi gì?

A:

  1. Sharding strategy: Partition theo device_id range hoặc geo-region
  2. Edge computing: Pre-process data tại edge trước khi gửi về central
  3. Kafka thay vì Service Bus: Cho higher throughput
  4. Data tiering: Hot data in memory (Redis), warm in ES, cold in data lake
  5. Auto-scaling: Kubernetes HPA dựa trên queue depth

← KF Project - Optimizely | Xem dự án tiếp theo →