IMPL-3: 集成 PendingConfigCache 生成待确认配置
All checks were successful
Build and Push Docker / build (push) Successful in 5m37s

- 创建 PendingConfigCache 类,支持内存缓存存储
- 实现变更检测逻辑(对比 ResourceVersion 和 Label Hash)
- 实现核心方法:AddOrUpdate, Get, GetAll, Remove, Clear
- 集成到 K8sServiceWatchService,在 Watch 事件中生成待确认配置
- 支持防止重复生成和状态标记(IsNew/IsModified)
This commit is contained in:
movingsam 2026-03-08 00:57:55 +08:00
parent 5f27300035
commit 0e04bb1690
3 changed files with 358 additions and 3 deletions

View File

@ -97,6 +97,8 @@ builder.Services.AddSwaggerGen(c =>
// 添加 K8s 服务监视 // 添加 K8s 服务监视
builder.Services.AddServiceLabelParser();
builder.Services.AddPendingConfigCache();
builder.Services.AddK8sServiceWatch(); builder.Services.AddK8sServiceWatch();
var app = builder.Build(); var app = builder.Build();

View File

@ -1,4 +1,5 @@
using System.Threading.Channels; using System.Threading.Channels;
using Fengling.Console.Models.Entities;
using k8s; using k8s;
using k8s.Models; using k8s.Models;
@ -34,6 +35,8 @@ public sealed class K8sServiceWatchService : BackgroundService, IDisposable
private readonly ILogger<K8sServiceWatchService> _logger; private readonly ILogger<K8sServiceWatchService> _logger;
private readonly IKubernetes _kubernetesClient; private readonly IKubernetes _kubernetesClient;
private readonly INotificationService _notificationService; private readonly INotificationService _notificationService;
private readonly PendingConfigCache _pendingConfigCache;
private readonly ServiceLabelParser _labelParser;
private readonly K8sServiceWatchOptions _options; private readonly K8sServiceWatchOptions _options;
// 用于通知新事件的通道 // 用于通知新事件的通道
@ -52,11 +55,15 @@ public sealed class K8sServiceWatchService : BackgroundService, IDisposable
ILogger<K8sServiceWatchService> logger, ILogger<K8sServiceWatchService> logger,
IKubernetes kubernetesClient, IKubernetes kubernetesClient,
INotificationService notificationService, INotificationService notificationService,
PendingConfigCache pendingConfigCache,
ServiceLabelParser labelParser,
IConfiguration configuration) IConfiguration configuration)
{ {
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_kubernetesClient = kubernetesClient ?? throw new ArgumentNullException(nameof(kubernetesClient)); _kubernetesClient = kubernetesClient ?? throw new ArgumentNullException(nameof(kubernetesClient));
_notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));
_pendingConfigCache = pendingConfigCache ?? throw new ArgumentNullException(nameof(pendingConfigCache));
_labelParser = labelParser ?? throw new ArgumentNullException(nameof(labelParser));
_options = new K8sServiceWatchOptions(); _options = new K8sServiceWatchOptions();
configuration.GetSection("K8sServiceWatch").Bind(_options); configuration.GetSection("K8sServiceWatch").Bind(_options);
@ -255,6 +262,10 @@ public sealed class K8sServiceWatchService : BackgroundService, IDisposable
return; return;
} }
var serviceName = metadata.Name ?? "unknown";
var serviceNamespace = metadata.NamespaceProperty ?? "default";
var serviceUid = metadata.Uid;
var eventAction = @event.EventType switch var eventAction = @event.EventType switch
{ {
WatchEventType.Added => "create", WatchEventType.Added => "create",
@ -265,19 +276,53 @@ public sealed class K8sServiceWatchService : BackgroundService, IDisposable
_logger.LogInformation("Service {Action}: {Name}/{Namespace}", _logger.LogInformation("Service {Action}: {Name}/{Namespace}",
eventAction, eventAction,
metadata.Name, serviceName,
metadata.NamespaceProperty); serviceNamespace);
// 处理删除事件:从缓存中移除对应的待确认配置
if (@event.EventType == WatchEventType.Deleted)
{
if (!string.IsNullOrWhiteSpace(serviceUid))
{
_pendingConfigCache.HandleServiceDeleted(serviceUid);
}
}
// 处理新增/修改事件:检查是否包含网关路由 Label如果有则生成待确认配置
else if (@event.EventType == WatchEventType.Added || @event.EventType == WatchEventType.Modified)
{
// 检查 Service 是否包含必需的网关路由 Label
if (_labelParser.HasRequiredLabels(service))
{
// 尝试添加到待确认配置缓存
var added = _pendingConfigCache.AddOrUpdate(service);
if (added)
{
_logger.LogInformation(
"Service {ServiceName}/{Namespace} 的网关配置已生成待确认项",
serviceName, serviceNamespace);
}
}
else
{
_logger.LogDebug(
"Service {ServiceName}/{Namespace} 缺少必需的网关路由 Label跳过配置生成",
serviceName, serviceNamespace);
}
}
// 发布配置变更通知 // 发布配置变更通知
var details = new var details = new
{ {
ServiceName = metadata.Name, ServiceName = metadata.Name,
Namespace = metadata.NamespaceProperty, Namespace = metadata.NamespaceProperty,
Uid = serviceUid,
ClusterIP = service.Spec?.ClusterIP, ClusterIP = service.Spec?.ClusterIP,
ExternalIPs = service.Spec?.ExternalIPs, ExternalIPs = service.Spec?.ExternalIPs,
Ports = service.Spec?.Ports?.Select(p => new { p.Name, p.Port, p.TargetPort, p.Protocol }), Ports = service.Spec?.Ports?.Select(p => new { p.Name, p.Port, p.TargetPort, p.Protocol }),
Labels = metadata.Labels, Labels = metadata.Labels,
EventTimestamp = @event.Timestamp EventTimestamp = @event.Timestamp,
PendingConfigCount = _pendingConfigCache.Count
}; };
await _notificationService.PublishConfigChangeAsync( await _notificationService.PublishConfigChangeAsync(

View File

@ -0,0 +1,308 @@
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using Fengling.Console.Models.Entities;
using Fengling.Console.Models.K8s;
using k8s.Models;
namespace Fengling.Console.Services;
/// <summary>
/// 待确认配置内存缓存
/// 以 K8s Service UID 为 key 存储待确认配置
/// </summary>
public sealed class PendingConfigCache
{
// 内存缓存存储Key = K8s Service UID
private readonly ConcurrentDictionary<string, PendingConfig> _cache = new();
// 存储上次处理的配置哈希,用于变更检测
private readonly ConcurrentDictionary<string, string> _configHashCache = new();
private readonly ILogger<PendingConfigCache> _logger;
private readonly ServiceLabelParser _labelParser;
public PendingConfigCache(
ILogger<PendingConfigCache> logger,
ServiceLabelParser labelParser)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_labelParser = labelParser ?? throw new ArgumentNullException(nameof(labelParser));
}
/// <summary>
/// 添加或更新待确认配置
/// 根据 Service 的变更状态自动设置 IsNew 和 IsModified 标志
/// </summary>
/// <param name="service">K8s Service 对象</param>
/// <returns>是否成功添加或更新(如果配置未变更则返回 false</returns>
public bool AddOrUpdate(V1Service service)
{
ArgumentNullException.ThrowIfNull(service);
var metadata = service.Metadata;
if (metadata?.Uid == null)
{
_logger.LogWarning("无法缓存配置Service 的 UID 为空");
return false;
}
var serviceUid = metadata.Uid;
var serviceName = metadata.Name ?? "unknown";
var serviceNamespace = metadata.NamespaceProperty ?? "default";
try
{
// 1. 解析 Service Label 生成配置
var parseResult = _labelParser.Parse(service);
if (!parseResult.Success || parseResult.Config == null)
{
_logger.LogDebug("Service {ServiceName}/{Namespace} 解析失败,跳过缓存: {Errors}",
serviceName, serviceNamespace, string.Join(", ", parseResult.Errors));
return false;
}
var config = parseResult.Config;
// 2. 计算当前配置的哈希值(用于变更检测)
var currentHash = ComputeConfigHash(service, config);
var existingHash = _configHashCache.GetValueOrDefault(serviceUid);
// 3. 变更检测逻辑
bool isNew = !_cache.ContainsKey(serviceUid);
bool isModified = !isNew && existingHash != currentHash;
if (!isNew && !isModified)
{
_logger.LogDebug("Service {ServiceName}/{Namespace} 配置未变更,跳过缓存",
serviceName, serviceNamespace);
return false;
}
// 4. 创建或更新 PendingConfig
var pendingConfig = new PendingConfig
{
Id = Guid.NewGuid().ToString("N"),
Type = PendingConfigType.Route, // K8s Service 对应 Route 配置
Source = PendingConfigSource.K8sDiscovery,
SourceId = serviceUid,
ConfigJson = JsonSerializer.Serialize(config, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
}),
ServiceName = config.ServiceName,
TenantCode = config.TenantCode,
ClusterId = config.ClusterId,
Status = PendingConfigStatus.Pending,
CreatedAt = DateTime.UtcNow,
IsNew = isNew,
IsModified = isModified
};
// 5. 存入缓存
_cache[serviceUid] = pendingConfig;
_configHashCache[serviceUid] = currentHash;
_logger.LogInformation(
"待确认配置已{Action}: Service={ServiceName}/{Namespace}, IsNew={IsNew}, IsModified={IsModified}",
isNew ? "添加" : "更新",
serviceName,
serviceNamespace,
isNew,
isModified);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "添加或更新待确认配置时发生错误: {ServiceName}/{Namespace}",
serviceName, serviceNamespace);
return false;
}
}
/// <summary>
/// 手动添加或更新待确认配置
/// </summary>
/// <param name="config">待确认配置</param>
public void AddOrUpdate(PendingConfig config)
{
ArgumentNullException.ThrowIfNull(config);
if (string.IsNullOrWhiteSpace(config.SourceId))
{
throw new ArgumentException("PendingConfig 的 SourceId 不能为空", nameof(config));
}
config.Id = string.IsNullOrWhiteSpace(config.Id)
? Guid.NewGuid().ToString("N")
: config.Id;
_cache[config.SourceId] = config;
_logger.LogInformation(
"待确认配置已手动添加/更新: SourceId={SourceId}, ServiceName={ServiceName}",
config.SourceId, config.ServiceName);
}
/// <summary>
/// 根据 Service UID 获取待确认配置
/// </summary>
/// <param name="serviceUid">K8s Service UID</param>
/// <returns>待确认配置,不存在则返回 null</returns>
public PendingConfig? Get(string serviceUid)
{
if (string.IsNullOrWhiteSpace(serviceUid))
{
return null;
}
_cache.TryGetValue(serviceUid, out var config);
return config;
}
/// <summary>
/// 获取所有待确认配置
/// </summary>
/// <returns>待确认配置列表</returns>
public IEnumerable<PendingConfig> GetAll()
{
return _cache.Values.ToList();
}
/// <summary>
/// 获取待确认配置数量
/// </summary>
public int Count => _cache.Count;
/// <summary>
/// 确认后移除待确认配置
/// </summary>
/// <param name="serviceUid">K8s Service UID</param>
/// <returns>是否成功移除</returns>
public bool Remove(string serviceUid)
{
if (string.IsNullOrWhiteSpace(serviceUid))
{
return false;
}
var removed = _cache.TryRemove(serviceUid, out _);
if (removed)
{
_configHashCache.TryRemove(serviceUid, out _);
_logger.LogDebug("待确认配置已移除: {ServiceUid}", serviceUid);
}
return removed;
}
/// <summary>
/// 清空所有待确认配置
/// </summary>
public void Clear()
{
_cache.Clear();
_configHashCache.Clear();
_logger.LogInformation("所有待确认配置已清空");
}
/// <summary>
/// 检查指定 Service 是否存在待确认配置
/// </summary>
/// <param name="serviceUid">K8s Service UID</param>
/// <returns>是否存在</returns>
public bool Contains(string serviceUid)
{
if (string.IsNullOrWhiteSpace(serviceUid))
{
return false;
}
return _cache.ContainsKey(serviceUid);
}
/// <summary>
/// 获取配置哈希值(用于变更检测)
/// 对比 ResourceVersion 和 Label 内容
/// </summary>
private string ComputeConfigHash(V1Service service, GatewayConfigModel config)
{
var metadata = service.Metadata;
// 组合关键信息用于计算哈希
var hashInput = new StringBuilder();
// 1. 包含 ResourceVersionK8s 对象版本)
hashInput.Append("RV:").Append(metadata?.ResourceVersion ?? "null").Append("|");
// 2. 包含关键 Label 值
var labels = metadata?.Labels ?? new Dictionary<string, string>();
hashInput.Append("L:");
var relevantLabels = new[]
{
"app-router-host",
"app-router-name",
"app-router-prefix",
"app-cluster-name",
"app-cluster-destination",
"app-router-tenant",
"app-router-version",
"app-router-weight"
};
foreach (var key in relevantLabels.OrderBy(k => k))
{
if (labels.TryGetValue(key, out var value))
{
hashInput.Append(key).Append("=").Append(value).Append(";");
}
}
hashInput.Append("|");
// 3. 包含服务地址和端口(如果发生变化也算变更)
hashInput.Append("S:").Append(config.ServiceAddress ?? "null").Append(":").Append(config.Port ?? 0);
// 计算 SHA256 哈希
using var sha256 = SHA256.Create();
var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(hashInput.ToString()));
return Convert.ToHexString(hashBytes);
}
/// <summary>
/// 处理 Service 删除事件
/// 当 Service 被删除时,移除对应的待确认配置
/// </summary>
/// <param name="serviceUid">K8s Service UID</param>
public void HandleServiceDeleted(string serviceUid)
{
if (string.IsNullOrWhiteSpace(serviceUid))
{
return;
}
if (_cache.TryRemove(serviceUid, out var config))
{
_configHashCache.TryRemove(serviceUid, out _);
_logger.LogInformation(
"Service 已删除,移除对应待确认配置: ServiceName={ServiceName}, SourceId={SourceId}",
config.ServiceName, config.SourceId);
}
}
}
/// <summary>
/// PendingConfigCache 扩展方法
/// </summary>
public static class PendingConfigCacheExtensions
{
/// <summary>
/// 添加 PendingConfigCache 服务
/// </summary>
public static IServiceCollection AddPendingConfigCache(this IServiceCollection services)
{
services.AddSingleton<PendingConfigCache>();
return services;
}
}