From 0e04bb1690bf95f7f03bc6ddf779b92acc0660ca Mon Sep 17 00:00:00 2001 From: movingsam Date: Sun, 8 Mar 2026 00:57:55 +0800 Subject: [PATCH] =?UTF-8?q?IMPL-3:=20=E9=9B=86=E6=88=90=20PendingConfigCac?= =?UTF-8?q?he=20=E7=94=9F=E6=88=90=E5=BE=85=E7=A1=AE=E8=AE=A4=E9=85=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 创建 PendingConfigCache 类,支持内存缓存存储 - 实现变更检测逻辑(对比 ResourceVersion 和 Label Hash) - 实现核心方法:AddOrUpdate, Get, GetAll, Remove, Clear - 集成到 K8sServiceWatchService,在 Watch 事件中生成待确认配置 - 支持防止重复生成和状态标记(IsNew/IsModified) --- src/Program.cs | 2 + src/Services/K8sServiceWatchService.cs | 51 +++- src/Services/PendingConfigCache.cs | 308 +++++++++++++++++++++++++ 3 files changed, 358 insertions(+), 3 deletions(-) create mode 100644 src/Services/PendingConfigCache.cs diff --git a/src/Program.cs b/src/Program.cs index c2b868c..d92e988 100644 --- a/src/Program.cs +++ b/src/Program.cs @@ -97,6 +97,8 @@ builder.Services.AddSwaggerGen(c => // 添加 K8s 服务监视 +builder.Services.AddServiceLabelParser(); +builder.Services.AddPendingConfigCache(); builder.Services.AddK8sServiceWatch(); var app = builder.Build(); diff --git a/src/Services/K8sServiceWatchService.cs b/src/Services/K8sServiceWatchService.cs index fd436c7..8498174 100644 --- a/src/Services/K8sServiceWatchService.cs +++ b/src/Services/K8sServiceWatchService.cs @@ -1,4 +1,5 @@ using System.Threading.Channels; +using Fengling.Console.Models.Entities; using k8s; using k8s.Models; @@ -34,6 +35,8 @@ public sealed class K8sServiceWatchService : BackgroundService, IDisposable private readonly ILogger _logger; private readonly IKubernetes _kubernetesClient; private readonly INotificationService _notificationService; + private readonly PendingConfigCache _pendingConfigCache; + private readonly ServiceLabelParser _labelParser; private readonly K8sServiceWatchOptions _options; // 用于通知新事件的通道 @@ -52,11 +55,15 @@ public sealed class K8sServiceWatchService : BackgroundService, IDisposable ILogger logger, IKubernetes kubernetesClient, INotificationService notificationService, + PendingConfigCache pendingConfigCache, + ServiceLabelParser labelParser, IConfiguration configuration) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _kubernetesClient = kubernetesClient ?? throw new ArgumentNullException(nameof(kubernetesClient)); _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); + _pendingConfigCache = pendingConfigCache ?? throw new ArgumentNullException(nameof(pendingConfigCache)); + _labelParser = labelParser ?? throw new ArgumentNullException(nameof(labelParser)); _options = new K8sServiceWatchOptions(); configuration.GetSection("K8sServiceWatch").Bind(_options); @@ -255,6 +262,10 @@ public sealed class K8sServiceWatchService : BackgroundService, IDisposable return; } + var serviceName = metadata.Name ?? "unknown"; + var serviceNamespace = metadata.NamespaceProperty ?? "default"; + var serviceUid = metadata.Uid; + var eventAction = @event.EventType switch { WatchEventType.Added => "create", @@ -265,19 +276,53 @@ public sealed class K8sServiceWatchService : BackgroundService, IDisposable _logger.LogInformation("Service {Action}: {Name}/{Namespace}", eventAction, - metadata.Name, - metadata.NamespaceProperty); + serviceName, + serviceNamespace); + + // 处理删除事件:从缓存中移除对应的待确认配置 + if (@event.EventType == WatchEventType.Deleted) + { + if (!string.IsNullOrWhiteSpace(serviceUid)) + { + _pendingConfigCache.HandleServiceDeleted(serviceUid); + } + } + // 处理新增/修改事件:检查是否包含网关路由 Label,如果有则生成待确认配置 + else if (@event.EventType == WatchEventType.Added || @event.EventType == WatchEventType.Modified) + { + // 检查 Service 是否包含必需的网关路由 Label + if (_labelParser.HasRequiredLabels(service)) + { + // 尝试添加到待确认配置缓存 + var added = _pendingConfigCache.AddOrUpdate(service); + + if (added) + { + _logger.LogInformation( + "Service {ServiceName}/{Namespace} 的网关配置已生成待确认项", + serviceName, serviceNamespace); + } + } + else + { + _logger.LogDebug( + "Service {ServiceName}/{Namespace} 缺少必需的网关路由 Label,跳过配置生成", + serviceName, serviceNamespace); + } + } // 发布配置变更通知 var details = new { ServiceName = metadata.Name, Namespace = metadata.NamespaceProperty, + Uid = serviceUid, ClusterIP = service.Spec?.ClusterIP, ExternalIPs = service.Spec?.ExternalIPs, Ports = service.Spec?.Ports?.Select(p => new { p.Name, p.Port, p.TargetPort, p.Protocol }), Labels = metadata.Labels, - EventTimestamp = @event.Timestamp + EventTimestamp = @event.Timestamp, + PendingConfigCount = _pendingConfigCache.Count }; await _notificationService.PublishConfigChangeAsync( diff --git a/src/Services/PendingConfigCache.cs b/src/Services/PendingConfigCache.cs new file mode 100644 index 0000000..f1475c9 --- /dev/null +++ b/src/Services/PendingConfigCache.cs @@ -0,0 +1,308 @@ +using System.Collections.Concurrent; +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using Fengling.Console.Models.Entities; +using Fengling.Console.Models.K8s; +using k8s.Models; + +namespace Fengling.Console.Services; + +/// +/// 待确认配置内存缓存 +/// 以 K8s Service UID 为 key 存储待确认配置 +/// +public sealed class PendingConfigCache +{ + // 内存缓存存储:Key = K8s Service UID + private readonly ConcurrentDictionary _cache = new(); + + // 存储上次处理的配置哈希,用于变更检测 + private readonly ConcurrentDictionary _configHashCache = new(); + + private readonly ILogger _logger; + private readonly ServiceLabelParser _labelParser; + + public PendingConfigCache( + ILogger logger, + ServiceLabelParser labelParser) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _labelParser = labelParser ?? throw new ArgumentNullException(nameof(labelParser)); + } + + /// + /// 添加或更新待确认配置 + /// 根据 Service 的变更状态自动设置 IsNew 和 IsModified 标志 + /// + /// K8s Service 对象 + /// 是否成功添加或更新(如果配置未变更则返回 false) + public bool AddOrUpdate(V1Service service) + { + ArgumentNullException.ThrowIfNull(service); + + var metadata = service.Metadata; + if (metadata?.Uid == null) + { + _logger.LogWarning("无法缓存配置:Service 的 UID 为空"); + return false; + } + + var serviceUid = metadata.Uid; + var serviceName = metadata.Name ?? "unknown"; + var serviceNamespace = metadata.NamespaceProperty ?? "default"; + + try + { + // 1. 解析 Service Label 生成配置 + var parseResult = _labelParser.Parse(service); + if (!parseResult.Success || parseResult.Config == null) + { + _logger.LogDebug("Service {ServiceName}/{Namespace} 解析失败,跳过缓存: {Errors}", + serviceName, serviceNamespace, string.Join(", ", parseResult.Errors)); + return false; + } + + var config = parseResult.Config; + + // 2. 计算当前配置的哈希值(用于变更检测) + var currentHash = ComputeConfigHash(service, config); + var existingHash = _configHashCache.GetValueOrDefault(serviceUid); + + // 3. 变更检测逻辑 + bool isNew = !_cache.ContainsKey(serviceUid); + bool isModified = !isNew && existingHash != currentHash; + + if (!isNew && !isModified) + { + _logger.LogDebug("Service {ServiceName}/{Namespace} 配置未变更,跳过缓存", + serviceName, serviceNamespace); + return false; + } + + // 4. 创建或更新 PendingConfig + var pendingConfig = new PendingConfig + { + Id = Guid.NewGuid().ToString("N"), + Type = PendingConfigType.Route, // K8s Service 对应 Route 配置 + Source = PendingConfigSource.K8sDiscovery, + SourceId = serviceUid, + ConfigJson = JsonSerializer.Serialize(config, new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }), + ServiceName = config.ServiceName, + TenantCode = config.TenantCode, + ClusterId = config.ClusterId, + Status = PendingConfigStatus.Pending, + CreatedAt = DateTime.UtcNow, + IsNew = isNew, + IsModified = isModified + }; + + // 5. 存入缓存 + _cache[serviceUid] = pendingConfig; + _configHashCache[serviceUid] = currentHash; + + _logger.LogInformation( + "待确认配置已{Action}: Service={ServiceName}/{Namespace}, IsNew={IsNew}, IsModified={IsModified}", + isNew ? "添加" : "更新", + serviceName, + serviceNamespace, + isNew, + isModified); + + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, "添加或更新待确认配置时发生错误: {ServiceName}/{Namespace}", + serviceName, serviceNamespace); + return false; + } + } + + /// + /// 手动添加或更新待确认配置 + /// + /// 待确认配置 + public void AddOrUpdate(PendingConfig config) + { + ArgumentNullException.ThrowIfNull(config); + + if (string.IsNullOrWhiteSpace(config.SourceId)) + { + throw new ArgumentException("PendingConfig 的 SourceId 不能为空", nameof(config)); + } + + config.Id = string.IsNullOrWhiteSpace(config.Id) + ? Guid.NewGuid().ToString("N") + : config.Id; + + _cache[config.SourceId] = config; + + _logger.LogInformation( + "待确认配置已手动添加/更新: SourceId={SourceId}, ServiceName={ServiceName}", + config.SourceId, config.ServiceName); + } + + /// + /// 根据 Service UID 获取待确认配置 + /// + /// K8s Service UID + /// 待确认配置,不存在则返回 null + public PendingConfig? Get(string serviceUid) + { + if (string.IsNullOrWhiteSpace(serviceUid)) + { + return null; + } + + _cache.TryGetValue(serviceUid, out var config); + return config; + } + + /// + /// 获取所有待确认配置 + /// + /// 待确认配置列表 + public IEnumerable GetAll() + { + return _cache.Values.ToList(); + } + + /// + /// 获取待确认配置数量 + /// + public int Count => _cache.Count; + + /// + /// 确认后移除待确认配置 + /// + /// K8s Service UID + /// 是否成功移除 + public bool Remove(string serviceUid) + { + if (string.IsNullOrWhiteSpace(serviceUid)) + { + return false; + } + + var removed = _cache.TryRemove(serviceUid, out _); + if (removed) + { + _configHashCache.TryRemove(serviceUid, out _); + _logger.LogDebug("待确认配置已移除: {ServiceUid}", serviceUid); + } + + return removed; + } + + /// + /// 清空所有待确认配置 + /// + public void Clear() + { + _cache.Clear(); + _configHashCache.Clear(); + _logger.LogInformation("所有待确认配置已清空"); + } + + /// + /// 检查指定 Service 是否存在待确认配置 + /// + /// K8s Service UID + /// 是否存在 + public bool Contains(string serviceUid) + { + if (string.IsNullOrWhiteSpace(serviceUid)) + { + return false; + } + + return _cache.ContainsKey(serviceUid); + } + + /// + /// 获取配置哈希值(用于变更检测) + /// 对比 ResourceVersion 和 Label 内容 + /// + private string ComputeConfigHash(V1Service service, GatewayConfigModel config) + { + var metadata = service.Metadata; + + // 组合关键信息用于计算哈希 + var hashInput = new StringBuilder(); + + // 1. 包含 ResourceVersion(K8s 对象版本) + hashInput.Append("RV:").Append(metadata?.ResourceVersion ?? "null").Append("|"); + + // 2. 包含关键 Label 值 + var labels = metadata?.Labels ?? new Dictionary(); + hashInput.Append("L:"); + var relevantLabels = new[] + { + "app-router-host", + "app-router-name", + "app-router-prefix", + "app-cluster-name", + "app-cluster-destination", + "app-router-tenant", + "app-router-version", + "app-router-weight" + }; + + foreach (var key in relevantLabels.OrderBy(k => k)) + { + if (labels.TryGetValue(key, out var value)) + { + hashInput.Append(key).Append("=").Append(value).Append(";"); + } + } + hashInput.Append("|"); + + // 3. 包含服务地址和端口(如果发生变化也算变更) + hashInput.Append("S:").Append(config.ServiceAddress ?? "null").Append(":").Append(config.Port ?? 0); + + // 计算 SHA256 哈希 + using var sha256 = SHA256.Create(); + var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(hashInput.ToString())); + return Convert.ToHexString(hashBytes); + } + + /// + /// 处理 Service 删除事件 + /// 当 Service 被删除时,移除对应的待确认配置 + /// + /// K8s Service UID + public void HandleServiceDeleted(string serviceUid) + { + if (string.IsNullOrWhiteSpace(serviceUid)) + { + return; + } + + if (_cache.TryRemove(serviceUid, out var config)) + { + _configHashCache.TryRemove(serviceUid, out _); + _logger.LogInformation( + "Service 已删除,移除对应待确认配置: ServiceName={ServiceName}, SourceId={SourceId}", + config.ServiceName, config.SourceId); + } + } +} + +/// +/// PendingConfigCache 扩展方法 +/// +public static class PendingConfigCacheExtensions +{ + /// + /// 添加 PendingConfigCache 服务 + /// + public static IServiceCollection AddPendingConfigCache(this IServiceCollection services) + { + services.AddSingleton(); + return services; + } +}