From 06db4231cfd612d0aacdd2a0e12183481363835f Mon Sep 17 00:00:00 2001 From: minhtrannhat Date: Mon, 30 Dec 2024 12:00:00 -0500 Subject: [PATCH] feat(worker): implement background jobs for incidents --- .../Jobs/EscalateIfUnackedJob.cs | 64 +++++++++ .../Jobs/IncidentTriggeredJob.cs | 64 +++++++++ .../Jobs/SendWebhookNotificationJob.cs | 124 ++++++++++++++++++ 3 files changed, 252 insertions(+) create mode 100644 src/IncidentOps.Worker/Jobs/EscalateIfUnackedJob.cs create mode 100644 src/IncidentOps.Worker/Jobs/IncidentTriggeredJob.cs create mode 100644 src/IncidentOps.Worker/Jobs/SendWebhookNotificationJob.cs diff --git a/src/IncidentOps.Worker/Jobs/EscalateIfUnackedJob.cs b/src/IncidentOps.Worker/Jobs/EscalateIfUnackedJob.cs new file mode 100644 index 0000000..6f88f7c --- /dev/null +++ b/src/IncidentOps.Worker/Jobs/EscalateIfUnackedJob.cs @@ -0,0 +1,64 @@ +using Hangfire; +using IncidentOps.Domain.Entities; +using IncidentOps.Domain.Enums; +using IncidentOps.Infrastructure.Data.Repositories; +using IncidentOps.Infrastructure.Jobs; +using Microsoft.Extensions.Logging; + +namespace IncidentOps.Worker.Jobs; + +public class EscalateIfUnackedJob : IEscalateIfUnackedJob +{ + private readonly IIncidentEventRepository _incidentEventRepository; + private readonly IBackgroundJobClient _backgroundJobClient; + private readonly ILogger _logger; + + public EscalateIfUnackedJob( + IIncidentEventRepository incidentEventRepository, + IBackgroundJobClient backgroundJobClient, + ILogger logger) + { + _incidentEventRepository = incidentEventRepository; + _backgroundJobClient = backgroundJobClient; + _logger = logger; + } + + public async Task ExecuteAsync(Guid incidentId, int step) + { + _logger.LogInformation("Checking escalation for incident {IncidentId}, step {Step}", incidentId, step); + + using var connection = new Npgsql.NpgsqlConnection( + Environment.GetEnvironmentVariable("ConnectionStrings__Postgres") ?? ""); + + var incident = await Dapper.SqlMapper.QuerySingleOrDefaultAsync( + connection, "SELECT * FROM incidents WHERE id = @Id", new { Id = incidentId }); + + if (incident == null) + { + _logger.LogWarning("Incident {IncidentId} not found for escalation", incidentId); + return; + } + + if (incident.Status != IncidentStatus.Triggered) + { + _logger.LogInformation("Incident {IncidentId} is no longer in Triggered state, skipping escalation", + incidentId); + return; + } + + // Record escalation event + await _incidentEventRepository.CreateAsync(new IncidentEvent + { + Id = Guid.NewGuid(), + IncidentId = incidentId, + EventType = IncidentEventType.EscalationTriggered, + Payload = $"{{\"step\": {step}}}", + CreatedAt = DateTime.UtcNow + }); + + _logger.LogInformation("Escalation triggered for incident {IncidentId}, step {Step}", incidentId, step); + + // TODO: Implement secondary notification targets or on-call escalation + // For now, just log the escalation + } +} diff --git a/src/IncidentOps.Worker/Jobs/IncidentTriggeredJob.cs b/src/IncidentOps.Worker/Jobs/IncidentTriggeredJob.cs new file mode 100644 index 0000000..c56ba66 --- /dev/null +++ b/src/IncidentOps.Worker/Jobs/IncidentTriggeredJob.cs @@ -0,0 +1,64 @@ +using Hangfire; +using IncidentOps.Infrastructure.Data.Repositories; +using IncidentOps.Infrastructure.Jobs; +using Microsoft.Extensions.Logging; + +namespace IncidentOps.Worker.Jobs; + +public class IncidentTriggeredJob : IIncidentTriggeredJob +{ + private readonly IIncidentRepository _incidentRepository; + private readonly INotificationTargetRepository _notificationTargetRepository; + private readonly IBackgroundJobClient _backgroundJobClient; + private readonly ILogger _logger; + + public IncidentTriggeredJob( + IIncidentRepository incidentRepository, + INotificationTargetRepository notificationTargetRepository, + IBackgroundJobClient backgroundJobClient, + ILogger logger) + { + _incidentRepository = incidentRepository; + _notificationTargetRepository = notificationTargetRepository; + _backgroundJobClient = backgroundJobClient; + _logger = logger; + } + + public async Task ExecuteAsync(Guid incidentId) + { + _logger.LogInformation("Processing incident triggered job for incident {IncidentId}", incidentId); + + // We need to get the incident to find its org_id + // Since we don't have org context here, we'll need to query differently + // For now, we'll use a direct query approach + // In production, you might pass orgId as a job parameter + + var targets = await GetTargetsForIncidentAsync(incidentId); + foreach (var target in targets) + { + _backgroundJobClient.Enqueue( + j => j.ExecuteAsync(incidentId, target.Id)); + } + + _logger.LogInformation("Enqueued {Count} notification jobs for incident {IncidentId}", targets.Count, incidentId); + } + + private async Task> GetTargetsForIncidentAsync(Guid incidentId) + { + // This is a simplified implementation + // In production, you'd want to pass the orgId with the job + // or query the incident first to get its org_id + using var connection = new Npgsql.NpgsqlConnection( + Environment.GetEnvironmentVariable("ConnectionStrings__Postgres") ?? ""); + + const string sql = @" + SELECT nt.* FROM notification_targets nt + INNER JOIN incidents i ON i.org_id = nt.org_id + WHERE i.id = @IncidentId AND nt.is_enabled = true"; + + var result = await Dapper.SqlMapper.QueryAsync( + connection, sql, new { IncidentId = incidentId }); + + return result.ToList(); + } +} diff --git a/src/IncidentOps.Worker/Jobs/SendWebhookNotificationJob.cs b/src/IncidentOps.Worker/Jobs/SendWebhookNotificationJob.cs new file mode 100644 index 0000000..e445e00 --- /dev/null +++ b/src/IncidentOps.Worker/Jobs/SendWebhookNotificationJob.cs @@ -0,0 +1,124 @@ +using System.Text; +using System.Text.Json; +using IncidentOps.Domain.Entities; +using IncidentOps.Domain.Enums; +using IncidentOps.Infrastructure.Data.Repositories; +using IncidentOps.Infrastructure.Jobs; +using Microsoft.Extensions.Logging; + +namespace IncidentOps.Worker.Jobs; + +public class SendWebhookNotificationJob : ISendWebhookNotificationJob +{ + private readonly IIncidentEventRepository _incidentEventRepository; + private readonly IHttpClientFactory _httpClientFactory; + private readonly ILogger _logger; + + public SendWebhookNotificationJob( + IIncidentEventRepository incidentEventRepository, + IHttpClientFactory httpClientFactory, + ILogger logger) + { + _incidentEventRepository = incidentEventRepository; + _httpClientFactory = httpClientFactory; + _logger = logger; + } + + public async Task ExecuteAsync(Guid incidentId, Guid targetId) + { + _logger.LogInformation("Sending webhook notification for incident {IncidentId} to target {TargetId}", + incidentId, targetId); + + try + { + // Get incident and target details + var (incident, target) = await GetIncidentAndTargetAsync(incidentId, targetId); + if (incident == null || target == null) + { + _logger.LogWarning("Incident or target not found. IncidentId: {IncidentId}, TargetId: {TargetId}", + incidentId, targetId); + return; + } + + // Parse webhook configuration + var config = JsonSerializer.Deserialize(target.Configuration); + if (config?.Url == null) + { + _logger.LogError("Invalid webhook configuration for target {TargetId}", targetId); + await RecordNotificationEventAsync(incidentId, false, "Invalid webhook configuration"); + return; + } + + // Build payload + var payload = new + { + incidentId = incident.Id, + title = incident.Title, + description = incident.Description, + status = incident.Status.ToString(), + createdAt = incident.CreatedAt, + serviceId = incident.ServiceId + }; + + // Send webhook + var client = _httpClientFactory.CreateClient(); + var content = new StringContent( + JsonSerializer.Serialize(payload), + Encoding.UTF8, + "application/json"); + + var response = await client.PostAsync(config.Url, content); + + if (response.IsSuccessStatusCode) + { + _logger.LogInformation("Successfully sent webhook for incident {IncidentId}", incidentId); + await RecordNotificationEventAsync(incidentId, true, null); + } + else + { + var errorMessage = $"Webhook failed with status {response.StatusCode}"; + _logger.LogWarning("Webhook failed for incident {IncidentId}: {Error}", incidentId, errorMessage); + await RecordNotificationEventAsync(incidentId, false, errorMessage); + throw new Exception(errorMessage); // Trigger Hangfire retry + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error sending webhook for incident {IncidentId}", incidentId); + await RecordNotificationEventAsync(incidentId, false, ex.Message); + throw; // Re-throw to trigger Hangfire retry + } + } + + private async Task<(Incident?, NotificationTarget?)> GetIncidentAndTargetAsync(Guid incidentId, Guid targetId) + { + using var connection = new Npgsql.NpgsqlConnection( + Environment.GetEnvironmentVariable("ConnectionStrings__Postgres") ?? ""); + + var incident = await Dapper.SqlMapper.QuerySingleOrDefaultAsync( + connection, "SELECT * FROM incidents WHERE id = @Id", new { Id = incidentId }); + + var target = await Dapper.SqlMapper.QuerySingleOrDefaultAsync( + connection, "SELECT * FROM notification_targets WHERE id = @Id", new { Id = targetId }); + + return (incident, target); + } + + private async Task RecordNotificationEventAsync(Guid incidentId, bool success, string? errorMessage) + { + var eventType = success ? IncidentEventType.NotificationSent : IncidentEventType.NotificationFailed; + var payload = success ? null : JsonSerializer.Serialize(new { error = errorMessage }); + + await _incidentEventRepository.CreateAsync(new IncidentEvent + { + Id = Guid.NewGuid(), + IncidentId = incidentId, + EventType = eventType, + ActorUserId = null, + Payload = payload, + CreatedAt = DateTime.UtcNow + }); + } + + private record WebhookConfig(string? Url, string? Secret); +}