SKCC Project | Industrial Monitoring System
Thời gian: 11/2024 – 04/2025
Vai trò: Backend Developer
Công ty: FPT Software
Tổng quan dự án
Bối cảnh
Nhà máy sản xuất cần hệ thống giám sát thiết bị công nghiệp theo thời gian thực để:
- Theo dõi trạng thái máy móc (nhiệt độ, áp suất, tốc độ, rung động)
- Phát hiện bất thường và cảnh báo sớm
- Tự động hóa quy trình thông báo khi có sự cố
- Tích hợp với các hệ thống ERP, MES hiện có
Yêu cầu chức năng
Functional Requirements
-
Real-time Monitoring
- Hiển thị dữ liệu sensor từ hàng ngàn thiết bị
- Update frequency: 1-5 giây
- Historical data visualization (charts, trends)
-
Alert & Notification
- Configurable thresholds cho từng sensor type
- Multi-channel notifications (Email, SMS, Webhook)
- Alert escalation rules
-
Webhook Engine
- User-defined rules với custom conditions
- Trigger external APIs khi có events
- Retry mechanism cho failed webhooks
-
Reporting & Analytics
- Daily/Weekly/Monthly reports
- OEE (Overall Equipment Effectiveness) calculations
- Downtime analysis
-
Integration APIs
- REST APIs cho third-party systems
- Data export (CSV, Excel, PDF)
Non-Functional Requirements
- Latency: End-to-end < 2 seconds cho real-time data
- Throughput: Xử lý 10,000+ events/giây
- Availability: 99.9% uptime (critical for factory operations)
- Scalability: Support thêm 50% devices mà không cần redesign
- Data Retention: Lưu 2 years raw data, 5 years aggregated data
Kiến trúc & Công nghệ
Technology Stack
┌────────────────────────────────────────────────────────────────────┐
│ Client Applications │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ WPF Client │ │ Web Dashboard│ │ Mobile App (Future) │ │
│ │ (Operators) │ │ (Managers) │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘
↕ SignalR (Real-time) ↕ REST API
┌────────────────────────────────────────────────────────────────────┐
│ Backend Services (.NET Core) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ Data │ │ Alert │ │ Webhook │ │
│ │ Ingestion │ │ Service │ │ Engine │ │
│ │ Service │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ Reporting │ │ Integration │ │ API Gateway │ │
│ │ Service │ │ Service │ │ (Kong) │ │
│ └──────────────┘ └──────────────┘ └──────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘
↕ ↕
┌─────────────────────┐ ┌──────────────────────────────┐
│ Data Stores │ │ Message Infrastructure │
│ ┌──────────────┐ │ │ ┌────────────────────────┐ │
│ │ ElasticSearch│ │ │ │ Azure Service Bus │ │
│ │ (Time-series)│ │ │ │ (Topics & Queues) │ │
│ └──────────────┘ │ │ └────────────────────────┘ │
│ ┌──────────────┐ │ │ │
│ │ SQL Server │ │ │ │
│ │ (Metadata) │ │ │ │
│ └──────────────┘ │ │ │
│ ┌──────────────┐ │ │ │
│ │ Redis │ │ │ │
│ │ (Cache) │ │ │ │
│ └──────────────┘ │ │ │
└─────────────────────┘ └──────────────────────────────┘
Components chi tiết
1. Data Ingestion Pipeline
public interface ISensorDataService
{
Task ProcessSensorDataAsync(SensorDataDto data);
Task<IEnumerable<SensorDataDto>> GetHistoricalDataAsync(
int deviceId,
DateTime from,
DateTime to);
Task<SensorStatusDto> GetCurrentStatusAsync(int deviceId);
}
public class SensorDataIngestionService : ISensorDataService
{
private readonly IElasticClient _elasticClient;
private readonly ITopicClient _serviceBusTopic;
private readonly ISensorCache _cache;
public async Task ProcessSensorDataAsync(SensorDataDto data)
{
// 1. Validate data
if (!IsValidSensorData(data))
{
throw new InvalidSensorDataException($"Invalid data from device {data.DeviceId}");
}
// 2. Store in ElasticSearch (time-series index)
var indexName = $"sensor-data-{data.Timestamp:yyyy-MM}";
await _elasticClient.IndexAsync(data, idx => idx.Index(indexName));
// 3. Update current status cache
await _cache.UpdateStatusAsync(data.DeviceId, new SensorStatus
{
LastValue = data.Value,
LastUpdated = data.Timestamp,
Status = DetermineStatus(data)
});
// 4. Check thresholds & publish events
var alertEvent = await CheckThresholdsAsync(data);
if (alertEvent != null)
{
await _serviceBusTopic.SendMessageAsync(new SensorAlertEvent
{
DeviceId = data.DeviceId,
SensorType = data.SensorType,
Value = data.Value,
Threshold = alertEvent.Threshold,
Severity = alertEvent.Severity,
Timestamp = data.Timestamp
});
}
// 5. Publish raw data event for other consumers
await _serviceBusTopic.SendMessageAsync(new SensorDataEvent
{
DeviceId = data.DeviceId,
Data = data
});
}
public async Task<IEnumerable<SensorDataDto>> GetHistoricalDataAsync(
int deviceId,
DateTime from,
DateTime to)
{
// Query ElasticSearch with date range
var searchResponse = await _elasticClient.SearchAsync<SensorDataDto>(s => s
.Index($"sensor-data-{from:yyyy-MM}")
.Query(q => q
.Bool(b => b
.Must(m => m.Term(t => t.DeviceId, deviceId))
.Filter(f => f
.DateRange(d => d
.Field(fld => fld.Timestamp)
.GreaterThanOrEquals(from)
.LessThanOrEquals(to)
)
)
)
)
.Sort(sort => sort
.Descending(d => d.Timestamp)
)
.Size(10000)
);
return searchResponse.Documents;
}
private SensorStatus DetermineStatus(SensorDataDto data)
{
// Business logic to determine if sensor is Normal, Warning, or Critical
var thresholds = _thresholdCache.Get(data.DeviceId, data.SensorType);
if (data.Value >= thresholds.Critical) return SensorStatus.Critical;
if (data.Value >= thresholds.Warning) return SensorStatus.Warning;
return SensorStatus.Normal;
}
}
public class SensorDataDto
{
public int DeviceId { get; set; }
public string SensorType { get; set; } // Temperature, Pressure, Vibration
public double Value { get; set; }
public string Unit { get; set; } // Celsius, PSI, mm/s
public DateTime Timestamp { get; set; }
public string Quality { get; set; } // Good, Bad, Uncertain
}
2. Alert Service với Rule Engine
public interface IAlertService
{
Task<AlertRuleDto> CreateRuleAsync(AlertRuleCreateRequest request);
Task UpdateRuleAsync(int ruleId, AlertRuleUpdateRequest request);
Task DeleteRuleAsync(int ruleId);
Task<IEnumerable<AlertRuleDto>> GetRulesByDeviceAsync(int deviceId);
Task ProcessAlertEventAsync(SensorAlertEvent alertEvent);
}
public class AlertRuleEngine : IAlertService
{
private readonly RulesEngine _rulesEngine;
private readonly IAlertRepository _alertRepository;
private readonly INotificationService _notificationService;
private readonly IWebhookDispatcher _webhookDispatcher;
public async Task<AlertRuleDto> CreateRuleAsync(AlertRuleCreateRequest request)
{
// Validate rule expression
var workflow = new Workflow
{
Name = $"Rule_{request.DeviceId}",
Rules = new List<Rule>
{
new Rule
{
Name = "ThresholdCheck",
Expression = request.RuleExpression, // e.g., "value > 100"
RuleExpressionType = RuleExpressionType.LambdaExpression
}
}
};
var validationResult = _rulesEngine.ValidateWorkflow(workflow);
if (validationResult.Any(error => error.ErrorType == ErrorType.Compilation))
{
throw new InvalidRuleExpressionException(validationResult.First().Message);
}
// Save rule
var rule = new AlertRule
{
DeviceId = request.DeviceId,
SensorType = request.SensorType,
RuleExpression = request.RuleExpression,
Severity = request.Severity,
NotificationChannels = request.NotificationChannels,
WebhookUrl = request.WebhookUrl,
IsActive = true,
CreatedAt = DateTime.UtcNow
};
await _alertRepository.AddAsync(rule);
return MapToDto(rule);
}
public async Task ProcessAlertEventAsync(SensorAlertEvent alertEvent)
{
// Get applicable rules
var rules = await _alertRepository.GetActiveRulesAsync(
alertEvent.DeviceId,
alertEvent.SensorType);
foreach (var rule in rules)
{
// Evaluate rule
var input = new { value = alertEvent.Value, threshold = alertEvent.Threshold };
var result = await _rulesEngine.ExecuteActionAsync(rule.RuleExpression, input);
if (result.IsSuccess)
{
// Create alert record
var alert = new Alert
{
RuleId = rule.Id,
DeviceId = alertEvent.DeviceId,
SensorType = alertEvent.SensorType,
Value = alertEvent.Value,
Severity = rule.Severity,
TriggeredAt = DateTime.UtcNow
};
await _alertRepository.CreateAlertAsync(alert);
// Send notifications (parallel)
var tasks = new List<Task>();
if (rule.NotificationChannels.Contains("Email"))
{
tasks.Add(_notificationService.SendEmailAlertAsync(alert));
}
if (rule.NotificationChannels.Contains("SMS"))
{
tasks.Add(_notificationService.SendSmsAlertAsync(alert));
}
if (!string.IsNullOrEmpty(rule.WebhookUrl))
{
tasks.Add(_webhookDispatcher.DispatchAsync(rule.WebhookUrl, alert));
}
await Task.WhenAll(tasks);
}
}
}
}
public class AlertRuleCreateRequest
{
public int DeviceId { get; set; }
public string SensorType { get; set; }
public string RuleExpression { get; set; } // e.g., "value > 100 && value < 200"
public AlertSeverity Severity { get; set; }
public List<string> NotificationChannels { get; set; }
public string WebhookUrl { get; set; }
}
public enum AlertSeverity
{
Low,
Medium,
High,
Critical
}
3. Webhook Engine với Retry & Circuit Breaker
public interface IWebhookDispatcher
{
Task DispatchAsync(string url, object payload);
Task<WebhookLogDto> GetWebhookLogAsync(string logId);
}
public class ResilientWebhookDispatcher : IWebhookDispatcher
{
private readonly HttpClient _httpClient;
private readonly IWebhookLogRepository _logRepository;
private readonly IBackgroundJobClient _jobClient;
private readonly ILogger<ResilientWebhookDispatcher> _logger;
// Circuit breaker policy
private readonly AsyncCircuitBreakerPolicy _circuitBreaker;
// Retry policy
private readonly AsyncRetryPolicy _retryPolicy;
public ResilientWebhookDispatcher(
HttpClient httpClient,
IWebhookLogRepository logRepository,
IBackgroundJobClient jobClient,
ILogger<ResilientWebhookDispatcher> logger)
{
_httpClient = httpClient;
_logRepository = logRepository;
_jobClient = jobClient;
_logger = logger;
// Configure retry policy
_retryPolicy = Policy
.Handle<HttpRequestException>()
.OrResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: retry => TimeSpan.FromSeconds(Math.Pow(2, retry)),
onRetry: (outcome, timespan, retryNumber, context) =>
{
_logger.LogWarning(
"Webhook failed. Retry {RetryNumber} after {Timespan}. Status: {Status}",
retryNumber,
timespan,
outcome.Result?.StatusCode);
}
);
// Configure circuit breaker
_circuitBreaker = Policy
.Handle<HttpRequestException>()
.OrResult<HttpResponseMessage>(r => r.StatusCode == HttpStatusCode.ServiceUnavailable)
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromMinutes(5),
onBreak: (outcome, duration) =>
{
_logger.LogError("Circuit breaker opened for {Duration}. Reason: {Reason}",
duration, outcome.Exception?.Message);
},
onReset: () =>
{
_logger.LogInformation("Circuit breaker reset to closed state.");
}
);
}
public async Task DispatchAsync(string url, object payload)
{
var logId = Guid.NewGuid().ToString();
var log = new WebhookLog
{
Id = logId,
Url = url,
Payload = JsonSerializer.Serialize(payload),
Status = WebhookStatus.Pending,
CreatedAt = DateTime.UtcNow
};
await _logRepository.CreateAsync(log);
try
{
// Execute with retry and circuit breaker
var response = await _circuitBreaker.ExecuteAsync(
async () => await _retryPolicy.ExecuteAsync(
async () =>
{
var content = new StringContent(
JsonSerializer.Serialize(payload),
Encoding.UTF8,
"application/json"
);
return await _httpClient.PostAsync(url, content);
}
)
);
// Update log
log.Status = response.IsSuccessStatusCode
? WebhookStatus.Success
: WebhookStatus.Failed;
log.ResponseCode = (int?)response.StatusCode;
log.ResponseBody = await response.Content.ReadAsStringAsync();
log.CompletedAt = DateTime.UtcNow;
await _logRepository.UpdateAsync(log);
if (!response.IsSuccessStatusCode)
{
throw new HttpRequestException($"Webhook failed with status {response.StatusCode}");
}
}
catch (BrokenCircuitException)
{
_logger.LogError("Circuit breaker open for webhook {Url}. Queuing for later.", url);
// Queue for later retry when circuit closes
await _jobClient.Schedule<IRetryWebhookJob>(
job => job.RetryAsync(logId),
TimeSpan.FromMinutes(10)
);
log.Status = WebhookStatus.CircuitOpen;
await _logRepository.UpdateAsync(log);
}
catch (Exception ex)
{
_logger.LogError(ex, "Webhook dispatch failed for {Url}", url);
log.Status = WebhookStatus.Failed;
log.ErrorMessage = ex.Message;
await _logRepository.UpdateAsync(log);
// Schedule retry if not circuit breaker
if (ex is not BrokenCircuitException)
{
await _jobClient.Schedule<IRetryWebhookJob>(
job => job.RetryAsync(logId),
TimeSpan.FromMinutes(5)
);
}
}
}
}
public interface IRetryWebhookJob
{
Task RetryAsync(string logId);
}
public class RetryWebhookJob : IRetryWebhookJob
{
private readonly IWebhookDispatcher _dispatcher;
private readonly IWebhookLogRepository _logRepository;
public async Task RetryAsync(string logId)
{
var log = await _logRepository.GetByIdAsync(logId);
if (log == null) return;
var payload = JsonSerializer.Deserialize<object>(log.Payload);
await _dispatcher.DispatchAsync(log.Url, payload);
}
}
4. WPF Client với Real-time Updates
// ViewModel với SignalR connection
public class MonitoringViewModel : INotifyPropertyChanged, IDisposable
{
private readonly HubConnection _hubConnection;
private readonly ISensorDataService _sensorService;
private ObservableCollection<SensorViewModel> _sensors;
public MonitoringViewModel(
ISensorDataService sensorService,
ILogger<MonitoringViewModel> logger)
{
_sensorService = sensorService;
// Initialize SignalR connection
_hubConnection = new HubConnectionBuilder()
.WithUrl("https://api.skcc.com/sensorhub")
.WithAutomaticReconnect(new RetryPolicy())
.Build();
// Register event handlers
_hubConnection.On<SensorDataDto>("SensorDataReceived", data =>
{
UpdateSensorData(data);
});
_hubConnection.On<AlertDto>("AlertReceived", alert =>
{
ShowAlert(alert);
});
// Start connection
_ = StartConnectionAsync();
}
private async Task StartConnectionAsync()
{
try
{
await _hubConnection.StartAsync();
// Load initial data
var sensors = await _sensorService.GetAllSensorsAsync();
Sensors = new ObservableCollection<SensorViewModel>(
sensors.Select(s => new SensorViewModel(s))
);
}
catch (Exception ex)
{
// Handle connection error
Console.WriteLine($"Connection failed: {ex.Message}");
}
}
private void UpdateSensorData(SensorDataDto data)
{
// Update UI on dispatcher thread
Application.Current.Dispatcher.Invoke(() =>
{
var sensor = Sensors.FirstOrDefault(s => s.DeviceId == data.DeviceId);
if (sensor != null)
{
sensor.UpdateValue(data.Value, data.Timestamp);
}
});
}
private void ShowAlert(AlertDto alert)
{
Application.Current.Dispatcher.Invoke(() =>
{
// Show alert notification
MessageBox.Show(
$"Alert: {alert.SensorType} on Device {alert.DeviceId}\n" +
$"Value: {alert.Value} {alert.Unit}\n" +
$"Severity: {alert.Severity}",
"Sensor Alert",
MessageBoxButton.OK,
alert.Severity == AlertSeverity.Critical
? MessageBoxImage.Error
: MessageBoxImage.Warning
);
});
}
public void Dispose()
{
_hubConnection?.DisposeAsync().GetAwaiter().GetResult();
}
// INotifyPropertyChanged implementation
public event PropertyChangedEventHandler PropertyChanged;
}
// Custom retry policy for SignalR
public class RetryPolicy : IRetryPolicy
{
public TimeSpan? NextRetryDelay(DefaultRetryContext context)
{
// Exponential backoff: 0s, 2s, 10s, 30s, max 2 minutes
if (context.PreviousRetryCount == 0) return TimeSpan.Zero;
if (context.PreviousRetryCount == 1) return TimeSpan.FromSeconds(2);
if (context.PreviousRetryCount == 2) return TimeSpan.FromSeconds(10);
if (context.PreviousRetryCount == 3) return TimeSpan.FromSeconds(30);
return TimeSpan.FromMinutes(2);
}
}
ElasticSearch Optimization
Index Design
// Index template for time-series data
public class ElasticSearchIndexSetup
{
public async Task SetupIndexTemplateAsync(IElasticClient client)
{
var templateName = "sensor-data-template";
var templateResponse = await client.Indices.PutTemplateAsync(templateName, pt => pt
.IndexPatterns("sensor-data-*")
.Settings(s => s
.NumberOfReplicas(1)
.NumberOfShards(3)
.RefreshInterval("5s") // Near real-time
)
.Map<m>(m
.AutoMap<SensorDataDto>()
.Properties(p => p
.Keyword(k => k
.Name(n => n.DeviceId)
)
.Keyword(k => k
.Name(n => n.SensorType)
)
.Date(d => d
.Name(n => n.Timestamp)
)
.Double(db => db
.Name(n => n.Value)
)
)
)
);
// Setup ILM (Index Lifecycle Management)
var policyName = "sensor-data-policy";
await client.XPack.IndexLifecycleManagement.PutLifecycleAsync(policyName, p => p
.Policy(new LifecyclePolicy
{
Phases = new Phases
{
Hot = new HotPhase
{
Actions = new Actions
{
RollOver = new RollOverAction
{
MaxAge = "30d",
MaxSize = "50gb"
}
}
},
Warm = new WarmPhase
{
MinAge = "30d",
Actions = new Actions
{
Shrink = new ShrinkAction
{
NumberOfShards = 1
},
SetPriority = new SetPriorityAction
{
Priority = 50
}
}
},
Cold = new ColdPhase
{
MinAge = "90d",
Actions = new Actions
{
SetPriority = new SetPriorityAction
{
Priority = 0
}
}
},
Delete = new DeletePhase
{
MinAge = "730d" // 2 years
}
}
})
);
}
}
Query Optimization
public class OptimizedSensorQueries
{
private readonly IElasticClient _client;
// Use search_after for deep pagination
public async Task<IEnumerable<SensorDataDto>> GetHistoricalDataWithPaginationAsync(
int deviceId,
DateTime from,
DateTime to,
int pageSize = 1000,
string? searchAfter = null)
{
ISearchResponse<SensorDataDto> searchResponse;
if (searchAfter == null)
{
// First page
searchResponse = await _client.SearchAsync<SensorDataDto>(s => s
.Index($"sensor-data-{from:yyyy-MM}")
.Query(q => q
.Bool(b => b
.Must(m => m.Term(t => t.DeviceId, deviceId))
.Filter(f => f
.DateRange(d => d
.Field(fld => fld.Timestamp)
.GreaterThanOrEquals(from)
.LessThanOrEquals(to)
)
)
)
)
.Sort(sort => sort
.Descending(d => d.Timestamp)
)
.Size(pageSize)
);
}
else
{
// Subsequent pages with search_after
var searchAfterValues = JsonSerializer.Deserialize<string[]>(searchAfter);
searchResponse = await _client.SearchAsync<SensorDataDto>(s => s
.Index($"sensor-data-{from:yyyy-MM}")
.Query(q => q
.Bool(b => b
.Must(m => m.Term(t => t.DeviceId, deviceId))
.Filter(f => f
.DateRange(d => d
.Field(fld => fld.Timestamp)
.GreaterThanOrEquals(from)
.LessThanOrEquals(to)
)
)
)
)
.Sort(sort => sort
.Descending(d => d.Timestamp)
)
.SearchAfter(searchAfterValues)
.Size(pageSize)
);
}
return searchResponse.Documents;
}
// Aggregation for dashboard
public async Task<SensorAggregationsDto> GetAggregationsAsync(
int deviceId,
DateTime from,
DateTime to)
{
var response = await _client.SearchAsync<SensorDataDto>(s => s
.Index($"sensor-data-{from:yyyy-MM}")
.Query(q => q
.Bool(b => b
.Must(m => m.Term(t => t.DeviceId, deviceId))
.Filter(f => f
.DateRange(d => d
.Field(fld => fld.Timestamp)
.GreaterThanOrEquals(from)
.LessThanOrEquals(to)
)
)
)
)
.Size(0) // No hits needed
.Aggregations(a => a
.Average("avg_value", avg => avg.Field(f => f.Value))
.Max("max_value", max => max.Field(f => f.Value))
.Min("min_value", min => min.Field(f => f.Value))
.StdDeviation("std_dev", std => std.Field(f => f.Value))
.DateHistogram("timeline", dh => dh
.Field(f => f.Timestamp)
.CalendarInterval(CalendarInterval.Hour)
.Aggregations(aa => aa
.Average("hourly_avg", a => a.Field(f => f.Value))
)
)
)
);
return new SensorAggregationsDto
{
AverageValue = response.Aggregations.Average("avg_value")?.Value,
MaxValue = response.Aggregations.Max("max_value")?.Value,
MinValue = response.Aggregations.Min("min_value")?.Value,
StdDeviation = response.Aggregations.StdDeviation("std_dev")?.StdDeviation,
HourlyAverages = response.Aggregations
.DateHistogram("timeline")
.Buckets
.Select(b => new HourlyAverage
{
Timestamp = b.Key,
AverageValue = b.Average("hourly_avg")?.Value
})
.ToList()
};
}
}
Thách thức & Giải pháp
Challenge 1: Xử lý 10,000+ events/giây
Vấn đề: Khi nhà máy có hàng ngàn sensors gửi data mỗi giây, hệ thống bị quá tải.
Giải pháp:
// Batch processing với Channel
public class BatchedSensorDataProcessor
{
private readonly Channel<SensorDataDto> _channel;
private readonly List<Task> _workers;
public BatchedSensorDataProcessor(int workerCount = 10, int batchSize = 100)
{
_channel = Channel.CreateBounded<SensorDataDto>(
new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.Wait
}
);
_workers = new List<Task>();
for (int i = 0; i < workerCount; i++)
{
_workers.Add(ProcessBatchAsync(batchSize));
}
}
public async Task QueueDataAsync(SensorDataDto data)
{
await _channel.Writer.WriteAsync(data);
}
private async Task ProcessBatchAsync(int batchSize)
{
var batch = new List<SensorDataDto>(batchSize);
while (await _channel.Reader.WaitToReadAsync())
{
batch.Clear();
// Collect batch
while (batch.Count < batchSize &&
_channel.Reader.TryRead(out var data))
{
batch.Add(data);
}
if (batch.Count > 0)
{
// Bulk index to ElasticSearch
await BulkIndexToElasticAsync(batch);
}
}
}
private async Task BulkIndexToElasticAsync(List<SensorDataDto> batch)
{
var bulkDescriptor = new BulkDescriptor();
foreach (var data in batch)
{
var indexName = $"sensor-data-{data.Timestamp:yyyy-MM}";
bulkDescriptor.Index<SensorDataDto>(idx => idx
.Index(indexName)
.Id($"{data.DeviceId}_{data.SensorType}_{data.Timestamp:O}")
.Document(data)
);
}
await _elasticClient.BulkAsync(bulkDescriptor);
}
}
Challenge 2: Real-time visualization với hàng ngàn data points
Vấn đề: WPF client bị chậm khi render chart với quá nhiều points.
Giải pháp:
// Data downsampling cho visualization
public class TimeSeriesDownsampler
{
// LTTB (Largest-Triangle-Three-Buckets) algorithm
public List<SensorDataPoint> Downsample(
List<SensorDataPoint> data,
int threshold)
{
if (data.Count <= threshold) return data;
var downsampled = new List<SensorDataPoint>();
int bucketSize = data.Count / threshold;
int lastSelectedIndex = 0;
downsampled.Add(data[0]); // Always include first point
for (int i = 0; i < threshold - 2; i++)
{
int bucketStart = (i + 1) * bucketSize;
int bucketEnd = (i + 2) * bucketSize;
// Find point with largest triangle area
var selectedPoint = FindLargestTrianglePoint(
data,
data[lastSelectedIndex],
bucketStart,
bucketEnd
);
downsampled.Add(selectedPoint.point);
lastSelectedIndex = selectedPoint.index;
}
downsampled.Add(data[^1]); // Always include last point
return downsampled;
}
private (SensorDataPoint point, int index) FindLargestTrianglePoint(
List<SensorDataPoint> data,
SensorDataPoint lastPoint,
int bucketStart,
int bucketEnd)
{
double maxArea = -1;
var selectedPoint = data[bucketStart];
var selectedIndex = bucketStart;
for (int i = bucketStart; i < bucketEnd && i < data.Count; i++)
{
// Calculate triangle area
double area = Math.Abs(
(lastPoint.Value * (data[i].Timestamp - data[bucketEnd].Timestamp) +
data[i].Value * (data[bucketEnd].Timestamp - lastPoint.Timestamp) +
data[bucketEnd].Value * (lastPoint.Timestamp - data[i].Timestamp)) / 2.0
);
if (area > maxArea)
{
maxArea = area;
selectedPoint = data[i];
selectedIndex = i;
}
}
return (selectedPoint, selectedIndex);
}
}
// Usage in WPF ViewModel
private void UpdateChartWithDownsampling(List<SensorDataPoint> rawData)
{
var downsampler = new TimeSeriesDownsampler();
var downsampled = downsampler.Downsample(rawData, threshold: 500);
// Render only 500 points instead of 10,000
ChartSeries.Points = new ObservableCollection<DataPoint>(
downsampled.Select(p => new DataPoint(p.Timestamp, p.Value))
);
}
Challenge 3: Webhook reliability với external systems
Vấn đề: External APIs có thể down, rate limit, hoặc trả về lỗi.
Giải pháp: (Xem Webhook Engine section ở trên với Circuit Breaker + Retry + Dead Letter Queue)
Kết quả & Impact
Metrics
| Metric | Before | After | Improvement |
|---|---|---|---|
| Event processing latency | 5.2s | 0.8s | 85% faster |
| Chart render time (10k points) | 3.5s | 0.3s | 91% faster |
| Webhook success rate | 78% | 99.5% | 27% improvement |
| Data retention | 30 days | 2 years | 24x longer |
| Alert delivery time | 15s | 2s | 87% faster |
Business Impact
- Giảm 60% downtime nhờ early warning system
- Tiết kiệm 200 giờ/năm manual data collection
- Tăng 40% OEE nhờ predictive maintenance
Bài học kinh nghiệm
Technical Learnings
- Time-series data cần special handling: Index rollover, downsampling, aggregation
- Circuit breaker là must-have: Khi tích hợp với external systems
- Batch processing > Real-time cho high volume: Trade-off giữa latency và throughput
- Monitoring & Alerting: Cần instrument mọi thứ từ day 1
Soft Skills
- Domain knowledge quan trọng: Hiểu factory operations để design đúng
- Stakeholder management: Operators vs Managers có different needs
- Documentation: Critical cho handover và maintenance
Câu hỏi phỏng vấn
Q1: Tại sao chọn ElasticSearch thay vì SQL Server cho sensor data?
A:
- Write throughput: ElasticSearch handle tốt hơn cho write-heavy time-series
- Aggregation performance: ES aggregations nhanh hơn SQL GROUP BY cho large datasets
- Index lifecycle: ILM built-in, dễ dàng rollover và delete old data
- Trade-off: ES không support transactions như SQL Server → dùng ES cho data, SQL cho metadata
Q2: Bạn xử lý data loss như thế nào khi system crash?
A:
// Write-ahead logging
public async Task ProcessSensorDataAsync(SensorDataDto data)
{
// 1. Write to WAL first (atomic)
await _walRepository.AppendAsync(new WalEntry
{
Data = data,
Timestamp = DateTime.UtcNow
});
try
{
// 2. Process data
await IndexToElasticAsync(data);
// 3. Mark WAL entry as processed
await _walRepository.MarkProcessedAsync(entry.Id);
}
catch (Exception ex)
{
// 4. On recovery, replay unprocessed WAL entries
_logger.LogError(ex, "Processing failed, will replay from WAL");
throw;
}
}
// Recovery process
public async Task RecoverAsync()
{
var unprocessedEntries = await _walRepository.GetUnprocessedAsync();
foreach (var entry in unprocessedEntries)
{
await IndexToElasticAsync(entry.Data);
await _walRepository.MarkProcessedAsync(entry.Id);
}
}
Q3: Làm sao để test real-time system với hàng ngàn concurrent connections?
A:
// Load test với WebApplicationFactory
[Fact]
public async Task LoadTest_SensorDataIngestion_10kEventsPerSecond()
{
using var factory = new CustomWebApplicationFactory();
using var client = factory.CreateClient();
var events = GenerateSensorEvents(10000);
var tasks = events.Select(e =>
client.PostAsJsonAsync("/api/sensor-data", e)
);
var stopwatch = Stopwatch.StartNew();
await Task.WhenAll(tasks);
stopwatch.Stop();
var throughput = 10000 / stopwatch.Elapsed.TotalSeconds;
Assert.True(throughput >= 5000, $"Expected >= 5000 events/s, got {throughput}");
}
// Integration test với TestContainers
[Fact]
public async Task IntegrationTest_ElasticSearchConnection()
{
await using var container = new ElasticSearchBuilder()
.Build();
await container.StartAsync();
var client = container.CreateClient();
// Test indexing
var doc = new SensorDataDto { /* ... */ };
var response = await client.IndexAsync(doc);
Assert.True(response.IsValid);
}
Q4: Bạn monitor system performance như thế nào?
A:
// Application Insights custom metrics
public class SensorDataIngestionService
{
private readonly TelemetryClient _telemetry;
private readonly Histogram _processingTime;
private readonly Counter _eventsProcessed;
public async Task ProcessSensorDataAsync(SensorDataDto data)
{
using var activity = _telemetry.StartOperation("ProcessSensorData");
var stopwatch = Stopwatch.StartNew();
try
{
await IndexToElasticAsync(data);
_eventsProcessed.Add(1);
_processingTime.Record(stopwatch.ElapsedMilliseconds);
activity.SetTag("success", true);
}
catch (Exception ex)
{
_telemetry.TrackException(ex);
activity.SetTag("success", false);
throw;
}
}
}
// Dashboard với Grafana
// Query Prometheus:
// - Rate: rate(events_processed_total[1m])
// - Latency: histogram_quantile(0.95, processing_time_seconds_bucket)
// - Errors: rate(process_sensor_data_errors_total[1m])
Q5: Nếu phải scale lên 100,000 devices, bạn sẽ thay đổi gì?
A:
- Sharding strategy: Partition theo device_id range hoặc geo-region
- Edge computing: Pre-process data tại edge trước khi gửi về central
- Kafka thay vì Service Bus: Cho higher throughput
- Data tiering: Hot data in memory (Redis), warm in ES, cold in data lake
- Auto-scaling: Kubernetes HPA dựa trên queue depth