- Add PendingServiceDiscovery model and database migration - Add PendingServices API controller for service assignment - Add KubernetesPendingSyncService for background sync - Add RBAC configuration for K8s service discovery - Update Dockerfile and K8s deployment configs - Add service discovery design documentation Workflow: K8s services with label managed-by=yarp are discovered and stored in pending table. Admin approves before they become active gateway downstream services.
162 lines
6.0 KiB
C#
162 lines
6.0 KiB
C#
using System.Text.Json;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using YarpGateway.Data;
|
|
using YarpGateway.Models;
|
|
|
|
namespace YarpGateway.Services;
|
|
|
|
public class KubernetesPendingSyncService : BackgroundService
|
|
{
|
|
private readonly IServiceProvider _serviceProvider;
|
|
private readonly ILogger<KubernetesPendingSyncService> _logger;
|
|
private readonly TimeSpan _syncInterval = TimeSpan.FromSeconds(30);
|
|
private readonly TimeSpan _staleThreshold = TimeSpan.FromHours(24);
|
|
|
|
public KubernetesPendingSyncService(
|
|
IServiceProvider serviceProvider,
|
|
ILogger<KubernetesPendingSyncService> logger)
|
|
{
|
|
_serviceProvider = serviceProvider;
|
|
_logger = logger;
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("Starting K8s pending service sync background task");
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await SyncPendingServicesAsync(stoppingToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error during K8s pending service sync");
|
|
}
|
|
|
|
await Task.Delay(_syncInterval, stoppingToken);
|
|
}
|
|
}
|
|
|
|
private async Task SyncPendingServicesAsync(CancellationToken ct)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var providers = scope.ServiceProvider.GetServices<Fengling.ServiceDiscovery.IServiceDiscoveryProvider>();
|
|
var k8sProvider = providers.FirstOrDefault(p => p.ProviderName == "Kubernetes");
|
|
|
|
if (k8sProvider == null)
|
|
{
|
|
_logger.LogWarning("No Kubernetes service discovery provider found");
|
|
return;
|
|
}
|
|
|
|
var dbContextFactory = scope.ServiceProvider.GetRequiredService<IDbContextFactory<GatewayDbContext>>();
|
|
|
|
var discoveredServices = await k8sProvider.GetServicesAsync(ct);
|
|
|
|
await using var db = await dbContextFactory.CreateDbContextAsync(ct);
|
|
|
|
var existingPending = await db.PendingServiceDiscoveries
|
|
.Where(p => !p.IsDeleted && p.Status == (int)PendingServiceStatus.Pending)
|
|
.ToListAsync(ct);
|
|
|
|
var existingDict = existingPending
|
|
.ToDictionary(p => $"{p.K8sServiceName}|{p.K8sNamespace}");
|
|
|
|
var discoveredSet = discoveredServices
|
|
.Select(s => $"{s.Name}|{s.Namespace}")
|
|
.ToHashSet();
|
|
|
|
var addedCount = 0;
|
|
var updatedCount = 0;
|
|
var cleanedCount = 0;
|
|
|
|
foreach (var item in existingDict)
|
|
{
|
|
var key = item.Key;
|
|
|
|
if (!discoveredSet.Contains(key))
|
|
{
|
|
var pending = item.Value;
|
|
|
|
if (DateTime.UtcNow - pending.DiscoveredAt > _staleThreshold)
|
|
{
|
|
pending.IsDeleted = true;
|
|
pending.Version++;
|
|
cleanedCount++;
|
|
_logger.LogInformation("Cleaned up stale pending service {ServiceName} in namespace {Namespace}",
|
|
pending.K8sServiceName, pending.K8sNamespace);
|
|
}
|
|
else
|
|
{
|
|
pending.Status = (int)PendingServiceStatus.K8sServiceNotFound;
|
|
pending.Version++;
|
|
_logger.LogInformation("Pending service {ServiceName} in namespace {Namespace} not found in K8s, marked as not found",
|
|
pending.K8sServiceName, pending.K8sNamespace);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (discoveredServices.Count > 0)
|
|
{
|
|
var discoveredDict = discoveredServices.ToDictionary(
|
|
s => $"{s.Name}|{s.Namespace}",
|
|
s => s);
|
|
|
|
foreach (var item in discoveredDict)
|
|
{
|
|
var key = item.Key;
|
|
var service = item.Value;
|
|
|
|
if (existingDict.TryGetValue(key, out var existing))
|
|
{
|
|
if (existing.Status == (int)PendingServiceStatus.K8sServiceNotFound)
|
|
{
|
|
existing.Status = (int)PendingServiceStatus.Pending;
|
|
existing.Version++;
|
|
updatedCount++;
|
|
}
|
|
|
|
var portsJson = JsonSerializer.Serialize(service.Ports);
|
|
var labelsJson = JsonSerializer.Serialize(service.Labels);
|
|
|
|
if (existing.DiscoveredPorts != portsJson || existing.Labels != labelsJson)
|
|
{
|
|
existing.DiscoveredPorts = portsJson;
|
|
existing.Labels = labelsJson;
|
|
existing.K8sClusterIP = service.ClusterIP;
|
|
existing.PodCount = service.Ports.Count;
|
|
existing.Version++;
|
|
updatedCount++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
var newPending = new GwPendingServiceDiscovery
|
|
{
|
|
K8sServiceName = service.Name,
|
|
K8sNamespace = service.Namespace,
|
|
K8sClusterIP = service.ClusterIP,
|
|
DiscoveredPorts = JsonSerializer.Serialize(service.Ports),
|
|
Labels = JsonSerializer.Serialize(service.Labels),
|
|
PodCount = service.Ports.Count,
|
|
Status = (int)PendingServiceStatus.Pending,
|
|
DiscoveredAt = DateTime.UtcNow,
|
|
Version = 1
|
|
};
|
|
db.PendingServiceDiscoveries.Add(newPending);
|
|
addedCount++;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (addedCount > 0 || updatedCount > 0 || cleanedCount > 0)
|
|
{
|
|
await db.SaveChangesAsync(ct);
|
|
_logger.LogInformation("K8s sync completed: {Added} new, {Updated} updated, {Cleaned} cleaned",
|
|
addedCount, updatedCount, cleanedCount);
|
|
}
|
|
}
|
|
}
|