diff --git a/Config/ConfigNotifyChannel.cs b/Config/ConfigNotifyChannel.cs new file mode 100644 index 0000000..da47c7a --- /dev/null +++ b/Config/ConfigNotifyChannel.cs @@ -0,0 +1,6 @@ +namespace YarpGateway.Config; + +public static class ConfigNotifyChannel +{ + public const string GatewayConfigChanged = "gateway_config_changed"; +} diff --git a/Services/PgSqlConfigChangeListener.cs b/Services/PgSqlConfigChangeListener.cs new file mode 100644 index 0000000..02f9978 --- /dev/null +++ b/Services/PgSqlConfigChangeListener.cs @@ -0,0 +1,222 @@ +using System.Threading.Channels; +using Microsoft.EntityFrameworkCore; +using Npgsql; +using YarpGateway.Config; +using YarpGateway.Data; +using YarpGateway.DynamicProxy; + +namespace YarpGateway.Services; + +public class PgSqlConfigChangeListener : BackgroundService +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + private readonly TimeSpan _fallbackInterval = TimeSpan.FromMinutes(5); + private readonly string _connectionString; + private NpgsqlConnection? _connection; + private int _lastRouteVersion; + private int _lastClusterVersion; + private readonly Channel _reloadChannel = Channel.CreateBounded(1); + + public PgSqlConfigChangeListener( + IServiceProvider serviceProvider, + ILogger logger, + IConfiguration configuration) + { + _serviceProvider = serviceProvider; + _logger = logger; + _connectionString = configuration.GetConnectionString("DefaultConnection") + ?? throw new InvalidOperationException("DefaultConnection is not configured"); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("Starting PgSql config change listener"); + + await InitializeListenerAsync(stoppingToken); + + _ = FallbackPollingAsync(stoppingToken); + + await ListenAsync(stoppingToken); + } + + private async Task InitializeListenerAsync(CancellationToken stoppingToken) + { + try + { + _connection = new NpgsqlConnection(_connectionString); + _connection.Notification += OnNotification; + await _connection.OpenAsync(stoppingToken); + + await using var cmd = _connection.CreateCommand(); + cmd.CommandText = $"LISTEN {ConfigNotifyChannel.GatewayConfigChanged}"; + await cmd.ExecuteNonQueryAsync(stoppingToken); + + _logger.LogInformation("Listening on {Channel}", ConfigNotifyChannel.GatewayConfigChanged); + + await UpdateVersionAsync(stoppingToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to initialize PgSql listener"); + } + } + + private void OnNotification(object sender, NpgsqlNotificationEventArgs e) + { + _logger.LogInformation("Received config change notification: {Payload}", e.Payload); + _ = _reloadChannel.Writer.WriteAsync(true); + } + + private async Task ListenAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + if (_connection == null || _connection.State != System.Data.ConnectionState.Open) + { + await ReconnectAsync(stoppingToken); + } + + await Task.Delay(Timeout.Infinite, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in listener loop, reconnecting..."); + await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); + await ReconnectAsync(stoppingToken); + } + } + } + + private async Task ReconnectAsync(CancellationToken stoppingToken) + { + try + { + _connection?.Dispose(); + _connection = new NpgsqlConnection(_connectionString); + _connection.Notification += OnNotification; + await _connection.OpenAsync(stoppingToken); + + await using var cmd = _connection.CreateCommand(); + cmd.CommandText = $"LISTEN {ConfigNotifyChannel.GatewayConfigChanged}"; + await cmd.ExecuteNonQueryAsync(stoppingToken); + + _logger.LogInformation("Reconnected to PostgreSQL and listening"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to reconnect"); + } + } + + private async Task FallbackPollingAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + await Task.Delay(_fallbackInterval, stoppingToken); + await CheckAndReloadAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in fallback polling"); + } + } + } + + private async Task CheckAndReloadAsync(CancellationToken stoppingToken) + { + try + { + await using var scope = _serviceProvider.CreateAsyncScope(); + await using var db = scope.ServiceProvider.GetRequiredService(); + + var currentRouteVersion = await db.TenantRoutes + .OrderByDescending(r => r.Version) + .Select(r => r.Version) + .FirstOrDefaultAsync(stoppingToken); + + var currentClusterVersion = await db.ServiceInstances + .OrderByDescending(i => i.Version) + .Select(i => i.Version) + .FirstOrDefaultAsync(stoppingToken); + + if (currentRouteVersion != _lastRouteVersion || currentClusterVersion != _lastClusterVersion) + { + _logger.LogInformation("Version change detected via fallback: route {RouteVer}->{NewRouteVer}, cluster {ClusterVer}->{NewClusterVer}", + _lastRouteVersion, currentRouteVersion, _lastClusterVersion, currentClusterVersion); + + await ReloadConfigAsync(stoppingToken); + + _lastRouteVersion = currentRouteVersion; + _lastClusterVersion = currentClusterVersion; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error checking version for fallback"); + } + } + + private async Task UpdateVersionAsync(CancellationToken stoppingToken) + { + try + { + await using var scope = _serviceProvider.CreateAsyncScope(); + await using var db = scope.ServiceProvider.GetRequiredService(); + + _lastRouteVersion = await db.TenantRoutes + .OrderByDescending(r => r.Version) + .Select(r => r.Version) + .FirstOrDefaultAsync(stoppingToken); + + _lastClusterVersion = await db.ServiceInstances + .OrderByDescending(i => i.Version) + .Select(i => i.Version) + .FirstOrDefaultAsync(stoppingToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error updating initial version"); + } + } + + private async Task ReloadConfigAsync(CancellationToken stoppingToken) + { + try + { + await using var scope = _serviceProvider.CreateAsyncScope(); + + var routeCache = scope.ServiceProvider.GetRequiredService(); + await routeCache.ReloadAsync(); + + var configProvider = scope.ServiceProvider.GetRequiredService(); + configProvider.UpdateConfig(); + + _logger.LogInformation("Configuration reloaded successfully"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error reloading configuration"); + } + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("Stopping PgSql config change listener"); + _reloadChannel.Writer.Complete(); + _connection?.Dispose(); + await base.StopAsync(cancellationToken); + } +}