fengling-gateway/Config/DatabaseClusterConfigProvider.cs

100 lines
3.0 KiB
C#

using Yarp.ReverseProxy.Configuration;
using Microsoft.EntityFrameworkCore;
using System.Collections.Concurrent;
using YarpGateway.Data;
using YarpGateway.Models;
namespace YarpGateway.Config;
public class DatabaseClusterConfigProvider
{
private readonly IDbContextFactory<GatewayDbContext> _dbContextFactory;
private readonly ConcurrentDictionary<string, ClusterConfig> _clusters = new();
private readonly SemaphoreSlim _lock = new(1, 1);
private readonly ILogger<DatabaseClusterConfigProvider> _logger;
public DatabaseClusterConfigProvider(IDbContextFactory<GatewayDbContext> dbContextFactory, ILogger<DatabaseClusterConfigProvider> logger)
{
_dbContextFactory = dbContextFactory;
_logger = logger;
_ = LoadConfigAsync();
}
public IReadOnlyList<ClusterConfig> GetClusters()
{
return _clusters.Values.ToList().AsReadOnly();
}
public async Task ReloadAsync()
{
await _lock.WaitAsync();
try
{
await LoadConfigInternalAsync();
}
finally
{
_lock.Release();
}
}
private async Task LoadConfigAsync()
{
await LoadConfigInternalAsync();
}
private async Task LoadConfigInternalAsync()
{
await using var dbContext = _dbContextFactory.CreateDbContext();
var instances = await dbContext.ServiceInstances
.Where(i => i.Status == 1 && !i.IsDeleted)
.GroupBy(i => i.ClusterId)
.ToListAsync();
var newClusters = new ConcurrentDictionary<string, ClusterConfig>();
foreach (var group in instances)
{
var destinations = new Dictionary<string, DestinationConfig>();
foreach (var instance in group)
{
destinations[instance.DestinationId] = new DestinationConfig
{
Address = instance.Address,
Metadata = new Dictionary<string, string>
{
["Weight"] = instance.Weight.ToString()
}
};
}
var config = new ClusterConfig
{
ClusterId = group.Key,
Destinations = destinations,
LoadBalancingPolicy = "DistributedWeightedRoundRobin",
HealthCheck = new HealthCheckConfig
{
Active = new ActiveHealthCheckConfig
{
Enabled = true,
Interval = TimeSpan.FromSeconds(30),
Timeout = TimeSpan.FromSeconds(5),
Path = "/health"
}
}
};
newClusters[group.Key] = config;
}
_clusters.Clear();
foreach (var cluster in newClusters)
{
_clusters[cluster.Key] = cluster.Value;
}
_logger.LogInformation("Loaded {Count} clusters from database", _clusters.Count);
}
}