fengling-console/src/Services/ConfigNotificationService.cs

134 lines
4.2 KiB
C#

using System.Text.Json;
using Microsoft.Extensions.Logging;
using Npgsql;
using Fengling.Console.Data;
using Microsoft.EntityFrameworkCore;
namespace Fengling.Console.Services;
/// <summary>
/// 配置变更通知服务接口
/// </summary>
public interface INotificationService
{
/// <summary>
/// 发布通知到指定通道
/// </summary>
Task PublishAsync(string channel, string payload, CancellationToken cancellationToken = default);
/// <summary>
/// 发布配置变更事件
/// </summary>
Task PublishConfigChangeAsync(
string eventType,
string action,
object? details = null,
CancellationToken cancellationToken = default);
}
/// <summary>
/// 配置变更事件数据
/// </summary>
public class ConfigChangeEvent
{
public string EventType { get; set; } = ""; // service, route, instance, gateway
public string Action { get; set; } = ""; // create, update, delete, reload
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
public object? Details { get; set; }
}
/// <summary>
/// PostgreSQL 通知服务实现
/// 使用 PostgreSQL NOTIFY/LISTEN 机制广播配置变更
/// </summary>
public class PgSqlNotificationService : INotificationService
{
private readonly ILogger<PgSqlNotificationService> _logger;
private readonly string _connectionString;
public const string GatewayConfigChangedChannel = "gateway_config_changed";
public PgSqlNotificationService(DbContextOptions<ConsoleDbContext> dbContextOptions, ILogger<PgSqlNotificationService> logger)
{
_logger = logger;
// 从 DbContextOptions 获取连接字符串
string? connectionString = null;
foreach (var ext in dbContextOptions.Extensions)
{
var extType = ext.GetType();
if (extType.Name.Contains("Npgsql"))
{
var prop = extType.GetProperty("ConnectionString");
if (prop != null && prop.PropertyType == typeof(string))
{
connectionString = prop.GetValue(ext) as string;
break;
}
}
}
_connectionString = connectionString
?? throw new InvalidOperationException("DefaultConnection not configured");
}
/// <inheritdoc/>
public async Task PublishAsync(string channel, string payload, CancellationToken cancellationToken = default)
{
try
{
await using var connection = new NpgsqlConnection(_connectionString);
await connection.OpenAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(
$"SELECT pg_notify(@channel, @payload)",
connection);
cmd.Parameters.AddWithValue("channel", channel);
cmd.Parameters.AddWithValue("payload", payload);
await cmd.ExecuteNonQueryAsync(cancellationToken);
_logger.LogInformation("Published notification to channel '{Channel}': {Payload}", channel, payload);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish notification to channel '{Channel}'", channel);
}
}
/// <summary>
/// 创建配置变更事件并发布
/// </summary>
public async Task PublishConfigChangeAsync(
string eventType,
string action,
object? details = null,
CancellationToken cancellationToken = default)
{
var configEvent = new ConfigChangeEvent
{
EventType = eventType,
Action = action,
Timestamp = DateTime.UtcNow,
Details = details
};
var payload = JsonSerializer.Serialize(configEvent);
await PublishAsync(GatewayConfigChangedChannel, payload, cancellationToken);
}
}
/// <summary>
/// 通知服务扩展方法
/// </summary>
public static class NotificationServiceExtensions
{
public static IServiceCollection AddNotificationService(this IServiceCollection services)
{
services.AddScoped<INotificationService, PgSqlNotificationService>();
return services;
}
}