docs: reorganize documentation structure
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
parent
1621f59e4a
commit
a6a7a5754e
6
Config/ConfigNotifyChannel.cs
Normal file
6
Config/ConfigNotifyChannel.cs
Normal file
@ -0,0 +1,6 @@
|
||||
namespace YarpGateway.Config;
|
||||
|
||||
public static class ConfigNotifyChannel
|
||||
{
|
||||
public const string GatewayConfigChanged = "gateway_config_changed";
|
||||
}
|
||||
222
Services/PgSqlConfigChangeListener.cs
Normal file
222
Services/PgSqlConfigChangeListener.cs
Normal file
@ -0,0 +1,222 @@
|
||||
using System.Threading.Channels;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Npgsql;
|
||||
using YarpGateway.Config;
|
||||
using YarpGateway.Data;
|
||||
using YarpGateway.DynamicProxy;
|
||||
|
||||
namespace YarpGateway.Services;
|
||||
|
||||
public class PgSqlConfigChangeListener : BackgroundService
|
||||
{
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly ILogger<PgSqlConfigChangeListener> _logger;
|
||||
private readonly TimeSpan _fallbackInterval = TimeSpan.FromMinutes(5);
|
||||
private readonly string _connectionString;
|
||||
private NpgsqlConnection? _connection;
|
||||
private int _lastRouteVersion;
|
||||
private int _lastClusterVersion;
|
||||
private readonly Channel<bool> _reloadChannel = Channel.CreateBounded<bool>(1);
|
||||
|
||||
public PgSqlConfigChangeListener(
|
||||
IServiceProvider serviceProvider,
|
||||
ILogger<PgSqlConfigChangeListener> logger,
|
||||
IConfiguration configuration)
|
||||
{
|
||||
_serviceProvider = serviceProvider;
|
||||
_logger = logger;
|
||||
_connectionString = configuration.GetConnectionString("DefaultConnection")
|
||||
?? throw new InvalidOperationException("DefaultConnection is not configured");
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("Starting PgSql config change listener");
|
||||
|
||||
await InitializeListenerAsync(stoppingToken);
|
||||
|
||||
_ = FallbackPollingAsync(stoppingToken);
|
||||
|
||||
await ListenAsync(stoppingToken);
|
||||
}
|
||||
|
||||
private async Task InitializeListenerAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
_connection = new NpgsqlConnection(_connectionString);
|
||||
_connection.Notification += OnNotification;
|
||||
await _connection.OpenAsync(stoppingToken);
|
||||
|
||||
await using var cmd = _connection.CreateCommand();
|
||||
cmd.CommandText = $"LISTEN {ConfigNotifyChannel.GatewayConfigChanged}";
|
||||
await cmd.ExecuteNonQueryAsync(stoppingToken);
|
||||
|
||||
_logger.LogInformation("Listening on {Channel}", ConfigNotifyChannel.GatewayConfigChanged);
|
||||
|
||||
await UpdateVersionAsync(stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to initialize PgSql listener");
|
||||
}
|
||||
}
|
||||
|
||||
private void OnNotification(object sender, NpgsqlNotificationEventArgs e)
|
||||
{
|
||||
_logger.LogInformation("Received config change notification: {Payload}", e.Payload);
|
||||
_ = _reloadChannel.Writer.WriteAsync(true);
|
||||
}
|
||||
|
||||
private async Task ListenAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_connection == null || _connection.State != System.Data.ConnectionState.Open)
|
||||
{
|
||||
await ReconnectAsync(stoppingToken);
|
||||
}
|
||||
|
||||
await Task.Delay(Timeout.Infinite, stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in listener loop, reconnecting...");
|
||||
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
|
||||
await ReconnectAsync(stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReconnectAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
_connection?.Dispose();
|
||||
_connection = new NpgsqlConnection(_connectionString);
|
||||
_connection.Notification += OnNotification;
|
||||
await _connection.OpenAsync(stoppingToken);
|
||||
|
||||
await using var cmd = _connection.CreateCommand();
|
||||
cmd.CommandText = $"LISTEN {ConfigNotifyChannel.GatewayConfigChanged}";
|
||||
await cmd.ExecuteNonQueryAsync(stoppingToken);
|
||||
|
||||
_logger.LogInformation("Reconnected to PostgreSQL and listening");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to reconnect");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task FallbackPollingAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(_fallbackInterval, stoppingToken);
|
||||
await CheckAndReloadAsync(stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in fallback polling");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CheckAndReloadAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await using var scope = _serviceProvider.CreateAsyncScope();
|
||||
await using var db = scope.ServiceProvider.GetRequiredService<GatewayDbContext>();
|
||||
|
||||
var currentRouteVersion = await db.TenantRoutes
|
||||
.OrderByDescending(r => r.Version)
|
||||
.Select(r => r.Version)
|
||||
.FirstOrDefaultAsync(stoppingToken);
|
||||
|
||||
var currentClusterVersion = await db.ServiceInstances
|
||||
.OrderByDescending(i => i.Version)
|
||||
.Select(i => i.Version)
|
||||
.FirstOrDefaultAsync(stoppingToken);
|
||||
|
||||
if (currentRouteVersion != _lastRouteVersion || currentClusterVersion != _lastClusterVersion)
|
||||
{
|
||||
_logger.LogInformation("Version change detected via fallback: route {RouteVer}->{NewRouteVer}, cluster {ClusterVer}->{NewClusterVer}",
|
||||
_lastRouteVersion, currentRouteVersion, _lastClusterVersion, currentClusterVersion);
|
||||
|
||||
await ReloadConfigAsync(stoppingToken);
|
||||
|
||||
_lastRouteVersion = currentRouteVersion;
|
||||
_lastClusterVersion = currentClusterVersion;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error checking version for fallback");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task UpdateVersionAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await using var scope = _serviceProvider.CreateAsyncScope();
|
||||
await using var db = scope.ServiceProvider.GetRequiredService<GatewayDbContext>();
|
||||
|
||||
_lastRouteVersion = await db.TenantRoutes
|
||||
.OrderByDescending(r => r.Version)
|
||||
.Select(r => r.Version)
|
||||
.FirstOrDefaultAsync(stoppingToken);
|
||||
|
||||
_lastClusterVersion = await db.ServiceInstances
|
||||
.OrderByDescending(i => i.Version)
|
||||
.Select(i => i.Version)
|
||||
.FirstOrDefaultAsync(stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error updating initial version");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReloadConfigAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await using var scope = _serviceProvider.CreateAsyncScope();
|
||||
|
||||
var routeCache = scope.ServiceProvider.GetRequiredService<IRouteCache>();
|
||||
await routeCache.ReloadAsync();
|
||||
|
||||
var configProvider = scope.ServiceProvider.GetRequiredService<DynamicProxyConfigProvider>();
|
||||
configProvider.UpdateConfig();
|
||||
|
||||
_logger.LogInformation("Configuration reloaded successfully");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error reloading configuration");
|
||||
}
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_logger.LogInformation("Stopping PgSql config change listener");
|
||||
_reloadChannel.Writer.Complete();
|
||||
_connection?.Dispose();
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user