From a6f624580771692b62f4cd73436ffbcd62ff6684 Mon Sep 17 00:00:00 2001 From: sam Date: Sun, 15 Feb 2026 10:32:51 +0800 Subject: [PATCH] feat: add ServiceDiscovery framework with Kubernetes, Consul and Static providers Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus --- .../Fengling.ServiceDiscovery.Consul.csproj | 18 ++ .../src/ConsulDiscoveryProvider.cs | 111 +++++++++++ .../ConsulServiceDiscoveryExtensions.cs | 63 +++++++ .../Fengling.ServiceDiscovery.Core.csproj | 19 ++ .../Abstractions/IServiceDiscoveryProvider.cs | 38 ++++ .../Extensions/ServiceCollectionExtensions.cs | 104 +++++++++++ .../src/Models/DiscoveredInstance.cs | 47 +++++ .../src/Models/DiscoveredService.cs | 52 ++++++ .../src/Models/HealthCheckResult.cs | 49 +++++ .../src/Models/ServiceDiscoveryChange.cs | 43 +++++ ...engling.ServiceDiscovery.Kubernetes.csproj | 19 ++ .../KubernetesServiceDiscoveryExtensions.cs | 81 ++++++++ .../src/KubernetesDiscoveryProvider.cs | 176 ++++++++++++++++++ .../Fengling.ServiceDiscovery.Static.csproj | 14 ++ .../StaticServiceDiscoveryExtensions.cs | 88 +++++++++ .../src/StaticDiscoveryProvider.cs | 84 +++++++++ 16 files changed, 1006 insertions(+) create mode 100644 Fengling.ServiceDiscovery.Consul/Fengling.ServiceDiscovery.Consul.csproj create mode 100644 Fengling.ServiceDiscovery.Consul/src/ConsulDiscoveryProvider.cs create mode 100644 Fengling.ServiceDiscovery.Consul/src/Extensions/ConsulServiceDiscoveryExtensions.cs create mode 100644 Fengling.ServiceDiscovery.Core/Fengling.ServiceDiscovery.Core.csproj create mode 100644 Fengling.ServiceDiscovery.Core/src/Abstractions/IServiceDiscoveryProvider.cs create mode 100644 Fengling.ServiceDiscovery.Core/src/Extensions/ServiceCollectionExtensions.cs create mode 100644 Fengling.ServiceDiscovery.Core/src/Models/DiscoveredInstance.cs create mode 100644 Fengling.ServiceDiscovery.Core/src/Models/DiscoveredService.cs create mode 100644 Fengling.ServiceDiscovery.Core/src/Models/HealthCheckResult.cs create mode 100644 Fengling.ServiceDiscovery.Core/src/Models/ServiceDiscoveryChange.cs create mode 100644 Fengling.ServiceDiscovery.Kubernetes/Fengling.ServiceDiscovery.Kubernetes.csproj create mode 100644 Fengling.ServiceDiscovery.Kubernetes/src/Extensions/KubernetesServiceDiscoveryExtensions.cs create mode 100644 Fengling.ServiceDiscovery.Kubernetes/src/KubernetesDiscoveryProvider.cs create mode 100644 Fengling.ServiceDiscovery.Static/Fengling.ServiceDiscovery.Static.csproj create mode 100644 Fengling.ServiceDiscovery.Static/src/Extensions/StaticServiceDiscoveryExtensions.cs create mode 100644 Fengling.ServiceDiscovery.Static/src/StaticDiscoveryProvider.cs diff --git a/Fengling.ServiceDiscovery.Consul/Fengling.ServiceDiscovery.Consul.csproj b/Fengling.ServiceDiscovery.Consul/Fengling.ServiceDiscovery.Consul.csproj new file mode 100644 index 0000000..809389c --- /dev/null +++ b/Fengling.ServiceDiscovery.Consul/Fengling.ServiceDiscovery.Consul.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + enable + enable + Fengling.ServiceDiscovery.Consul + + + + + + + + + + + diff --git a/Fengling.ServiceDiscovery.Consul/src/ConsulDiscoveryProvider.cs b/Fengling.ServiceDiscovery.Consul/src/ConsulDiscoveryProvider.cs new file mode 100644 index 0000000..8bec1e0 --- /dev/null +++ b/Fengling.ServiceDiscovery.Consul/src/ConsulDiscoveryProvider.cs @@ -0,0 +1,111 @@ +using System.Runtime.CompilerServices; +using Consul; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Fengling.ServiceDiscovery.Consul.Extensions; + +namespace Fengling.ServiceDiscovery.Consul; + +/// +/// Consul 服务发现提供商 +/// +public class ConsulDiscoveryProvider : IServiceDiscoveryProvider +{ + private readonly IConsulClient _consul; + private readonly ConsulDiscoveryOptions _options; + private readonly ILogger _logger; + + public string ProviderName => "Consul"; + + public ConsulDiscoveryProvider( + IConsulClient consul, + IOptions options, + ILogger logger) + { + _consul = consul; + _options = options.Value; + _logger = logger; + } + + public async Task> GetServicesAsync(CancellationToken ct = default) + { + try + { + var services = await _consul.Catalog.Services(ct: ct); + + return services.Response.Select(kvp => new DiscoveredService + { + Name = kvp.Key, + Namespace = _options.Datacenter ?? "dc1", + Labels = kvp.Value?.ToDictionary(x => x, x => x) ?? new Dictionary(), + Ports = new List(), // Consul 不直接提供端口,需要查每个服务 + DiscoveredAt = DateTime.UtcNow + }).ToList(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to get services from Consul"); + return Array.Empty(); + } + } + + public async Task> GetInstancesAsync( + string serviceName, + string? namespace_ = null, + CancellationToken ct = default) + { + try + { + var service = await _consul.Catalog.Service(serviceName, ct: ct); + + return service.Response?.Select(s => new DiscoveredInstance + { + InstanceId = s.ServiceID, + Address = $"http://{s.ServiceAddress}:{s.ServicePort}", + Port = s.ServicePort, + IsHealthy = true, + HealthStatus = "Passing" + }).ToList() ?? new List(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to get instances for service {Service} from Consul", serviceName); + return Array.Empty(); + } + } + + public async IAsyncEnumerable WatchServicesAsync( + [EnumeratorCancellation] CancellationToken ct) + { + var serviceKeys = await _consul.Catalog.Services(ct: ct); + + // Consul 没有原生的 Watch 事件,需要轮询或使用 Agent.Check + // 这里简化实现,返回空 + await Task.CompletedTask; + yield break; + } + + public async Task CheckHealthAsync( + string address, + string? healthPath = null, + CancellationToken ct = default) + { + var startTime = DateTime.UtcNow; + + try + { + using var client = new HttpClient { Timeout = TimeSpan.FromSeconds(5) }; + var response = await client.GetAsync($"{address.TrimEnd('/')}/health", ct); + var elapsed = (long)(DateTime.UtcNow - startTime).TotalMilliseconds; + + return response.IsSuccessStatusCode + ? HealthCheckResult.Healthy(elapsed) + : HealthCheckResult.Unhealthy($"HTTP {response.StatusCode}", elapsed); + } + catch (Exception ex) + { + var elapsed = (long)(DateTime.UtcNow - startTime).TotalMilliseconds; + return HealthCheckResult.Unhealthy(ex.Message, elapsed); + } + } +} diff --git a/Fengling.ServiceDiscovery.Consul/src/Extensions/ConsulServiceDiscoveryExtensions.cs b/Fengling.ServiceDiscovery.Consul/src/Extensions/ConsulServiceDiscoveryExtensions.cs new file mode 100644 index 0000000..b0cc1fb --- /dev/null +++ b/Fengling.ServiceDiscovery.Consul/src/Extensions/ConsulServiceDiscoveryExtensions.cs @@ -0,0 +1,63 @@ +using Consul; +using Microsoft.Extensions.DependencyInjection; + +namespace Fengling.ServiceDiscovery.Consul.Extensions; + +/// +/// Consul 服务发现扩展方法 +/// +public static class ConsulServiceDiscoveryExtensions +{ + /// + /// 添加 Consul 服务发现 + /// + public static IServiceCollection AddConsulServiceDiscovery( + this IServiceCollection services, + Action? configure = null) + { + var options = new ConsulDiscoveryOptions(); + configure?.Invoke(options); + + services.AddSingleton(options); + + // 注册 Consul 客户端 + services.AddSingleton(sp => + { + var opts = sp.GetRequiredService(); + return new ConsulClient(config => + { + config.Address = opts.Address; + config.Datacenter = opts.Datacenter; + if (!string.IsNullOrEmpty(opts.Token)) + { + config.Token = opts.Token; + } + }); + }); + + services.AddSingleton(); + + return services; + } +} + +/// +/// Consul 发现配置选项 +/// +public class ConsulDiscoveryOptions +{ + /// + /// Consul 地址 + /// + public Uri Address { get; set; } = new("http://localhost:8500"); + + /// + /// 数据中心 + /// + public string? Datacenter { get; set; } + + /// + /// ACL Token + /// + public string? Token { get; set; } +} diff --git a/Fengling.ServiceDiscovery.Core/Fengling.ServiceDiscovery.Core.csproj b/Fengling.ServiceDiscovery.Core/Fengling.ServiceDiscovery.Core.csproj new file mode 100644 index 0000000..9c36c62 --- /dev/null +++ b/Fengling.ServiceDiscovery.Core/Fengling.ServiceDiscovery.Core.csproj @@ -0,0 +1,19 @@ + + + + net8.0 + enable + enable + Fengling.ServiceDiscovery + Fengling.ServiceDiscovery.Core + + + + + + + + + + + diff --git a/Fengling.ServiceDiscovery.Core/src/Abstractions/IServiceDiscoveryProvider.cs b/Fengling.ServiceDiscovery.Core/src/Abstractions/IServiceDiscoveryProvider.cs new file mode 100644 index 0000000..c6014ca --- /dev/null +++ b/Fengling.ServiceDiscovery.Core/src/Abstractions/IServiceDiscoveryProvider.cs @@ -0,0 +1,38 @@ +namespace Fengling.ServiceDiscovery; + +/// +/// 服务发现提供商 +/// +public interface IServiceDiscoveryProvider +{ + /// + /// 提供商名称 (如: "Kubernetes", "Consul", "Static") + /// + string ProviderName { get; } + + /// + /// 获取所有服务 + /// + Task> GetServicesAsync(CancellationToken ct = default); + + /// + /// 获取服务实例 + /// + Task> GetInstancesAsync( + string serviceName, + string? namespace_ = null, + CancellationToken ct = default); + + /// + /// 监听服务变化 + /// + IAsyncEnumerable WatchServicesAsync(CancellationToken ct = default); + + /// + /// 健康检查 + /// + Task CheckHealthAsync( + string address, + string? healthPath = null, + CancellationToken ct = default); +} diff --git a/Fengling.ServiceDiscovery.Core/src/Extensions/ServiceCollectionExtensions.cs b/Fengling.ServiceDiscovery.Core/src/Extensions/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..f2d4cec --- /dev/null +++ b/Fengling.ServiceDiscovery.Core/src/Extensions/ServiceCollectionExtensions.cs @@ -0,0 +1,104 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Fengling.ServiceDiscovery.Extensions; + +/// +/// 服务发现扩展方法 +/// +public static class ServiceCollectionExtensions +{ + /// + /// 添加服务发现 + /// + public static IServiceCollection AddServiceDiscovery( + this IServiceCollection services, + Action? configure = null) + { + var options = new ServiceDiscoveryOptions(); + configure?.Invoke(options); + + services.AddSingleton(options); + + // 如果启用了后台同步,注册后台服务 + if (options.EnableBackgroundSync) + { + services.AddHostedService(); + } + + return services; + } +} + +/// +/// 服务发现配置选项 +/// +public class ServiceDiscoveryOptions +{ + /// + /// 启用后台同步 + /// + public bool EnableBackgroundSync { get; set; } = true; + + /// + /// 同步间隔 + /// + public TimeSpan SyncInterval { get; set; } = TimeSpan.FromSeconds(30); +} + +/// +/// 后台同步服务 - 通过接口调度 +/// +public class ServiceDiscoveryBackgroundSync : BackgroundService +{ + private readonly ILogger _logger; + private readonly ServiceDiscoveryOptions _options; + private readonly IEnumerable _providers; + + public ServiceDiscoveryBackgroundSync( + IEnumerable providers, + ILogger logger, + IOptions options) + { + _providers = providers; + _logger = logger; + _options = options.Value; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("Starting service discovery background sync"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + foreach (var provider in _providers) + { + var services = await provider.GetServicesAsync(stoppingToken); + _logger.LogDebug("[{Provider}] Discovered {Count} services", + provider.ProviderName, services.Count); + + foreach (var service in services) + { + var instances = await provider.GetInstancesAsync( + service.Name, + service.Namespace, + stoppingToken); + + _logger.LogDebug("[{Provider}] Service {Name} has {Count} instances", + provider.ProviderName, service.Name, instances.Count); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during service discovery sync"); + } + + await Task.Delay(_options.SyncInterval, stoppingToken); + } + } +} diff --git a/Fengling.ServiceDiscovery.Core/src/Models/DiscoveredInstance.cs b/Fengling.ServiceDiscovery.Core/src/Models/DiscoveredInstance.cs new file mode 100644 index 0000000..18d53fe --- /dev/null +++ b/Fengling.ServiceDiscovery.Core/src/Models/DiscoveredInstance.cs @@ -0,0 +1,47 @@ +namespace Fengling.ServiceDiscovery; + +/// +/// 发现的服务实例信息 +/// +public record DiscoveredInstance +{ + /// + /// 实例 ID + /// + public required string InstanceId { get; init; } + + /// + /// Pod 名称 + /// + public string? PodName { get; init; } + + /// + /// Pod IP + /// + public string? PodIP { get; init; } + + /// + /// 服务地址 (http://ip:port) + /// + public required string Address { get; init; } + + /// + /// 权重 + /// + public int Weight { get; init; } = 1; + + /// + /// 是否健康 + /// + public bool IsHealthy { get; init; } + + /// + /// 健康状态描述 + /// + public string? HealthStatus { get; init; } + + /// + /// 实例端口 + /// + public int Port { get; init; } +} diff --git a/Fengling.ServiceDiscovery.Core/src/Models/DiscoveredService.cs b/Fengling.ServiceDiscovery.Core/src/Models/DiscoveredService.cs new file mode 100644 index 0000000..ed7b1c6 --- /dev/null +++ b/Fengling.ServiceDiscovery.Core/src/Models/DiscoveredService.cs @@ -0,0 +1,52 @@ +namespace Fengling.ServiceDiscovery; + +/// +/// 发现的服务信息 +/// +public record DiscoveredService +{ + /// + /// 服务名称 + /// + public required string Name { get; init; } + + /// + /// 命名空间 + /// + public required string Namespace { get; init; } + + /// + /// 集群内部 IP + /// + public string? ClusterIP { get; init; } + + /// + /// 外部 IP + /// + public string? ExternalIP { get; init; } + + /// + /// 端口列表 + /// + public IReadOnlyList Ports { get; init; } = Array.Empty(); + + /// + /// 标签 + /// + public IReadOnlyDictionary Labels { get; init; } = new Dictionary(); + + /// + /// 注解 + /// + public IReadOnlyDictionary Annotations { get; init; } = new Dictionary(); + + /// + /// 发现时间 + /// + public DateTime DiscoveredAt { get; init; } = DateTime.UtcNow; + + /// + /// 是否已分配路由 + /// + public bool IsAssigned { get; set; } +} diff --git a/Fengling.ServiceDiscovery.Core/src/Models/HealthCheckResult.cs b/Fengling.ServiceDiscovery.Core/src/Models/HealthCheckResult.cs new file mode 100644 index 0000000..6300fde --- /dev/null +++ b/Fengling.ServiceDiscovery.Core/src/Models/HealthCheckResult.cs @@ -0,0 +1,49 @@ +namespace Fengling.ServiceDiscovery; + +/// +/// 健康检查结果 +/// +public record HealthCheckResult +{ + /// + /// 是否健康 + /// + public required bool IsHealthy { get; init; } + + /// + /// 响应时间 (毫秒) + /// + public long ResponseTimeMs { get; init; } + + /// + /// 状态描述 + /// + public string? Status { get; init; } + + /// + /// 错误信息 + /// + public string? Error { get; init; } + + /// + /// 检查时间 + /// + public DateTime CheckedAt { get; init; } = DateTime.UtcNow; + + public static HealthCheckResult Healthy(long responseTimeMs = 0) + => new() + { + IsHealthy = true, + ResponseTimeMs = responseTimeMs, + Status = "Healthy" + }; + + public static HealthCheckResult Unhealthy(string error, long responseTimeMs = 0) + => new() + { + IsHealthy = false, + ResponseTimeMs = responseTimeMs, + Status = "Unhealthy", + Error = error + }; +} diff --git a/Fengling.ServiceDiscovery.Core/src/Models/ServiceDiscoveryChange.cs b/Fengling.ServiceDiscovery.Core/src/Models/ServiceDiscoveryChange.cs new file mode 100644 index 0000000..4ab5faf --- /dev/null +++ b/Fengling.ServiceDiscovery.Core/src/Models/ServiceDiscoveryChange.cs @@ -0,0 +1,43 @@ +namespace Fengling.ServiceDiscovery; + +/// +/// 服务发现变更事件 +/// +public record ServiceDiscoveryChange +{ + /// + /// 变更类型 + /// + public required ServiceDiscoveryChangeType Type { get; init; } + + /// + /// 变更的服务 + /// + public required DiscoveredService Service { get; init; } + + /// + /// 关联的实例 + /// + public IReadOnlyList? Instances { get; init; } +} + +/// +/// 服务发现变更类型 +/// +public enum ServiceDiscoveryChangeType +{ + /// + /// 新增 + /// + Added, + + /// + /// 修改 + /// + Modified, + + /// + /// 删除 + /// + Deleted +} diff --git a/Fengling.ServiceDiscovery.Kubernetes/Fengling.ServiceDiscovery.Kubernetes.csproj b/Fengling.ServiceDiscovery.Kubernetes/Fengling.ServiceDiscovery.Kubernetes.csproj new file mode 100644 index 0000000..e50ba17 --- /dev/null +++ b/Fengling.ServiceDiscovery.Kubernetes/Fengling.ServiceDiscovery.Kubernetes.csproj @@ -0,0 +1,19 @@ + + + + net8.0 + enable + enable + Fengling.ServiceDiscovery.Kubernetes + + + + + + + + + + + + diff --git a/Fengling.ServiceDiscovery.Kubernetes/src/Extensions/KubernetesServiceDiscoveryExtensions.cs b/Fengling.ServiceDiscovery.Kubernetes/src/Extensions/KubernetesServiceDiscoveryExtensions.cs new file mode 100644 index 0000000..3d896b0 --- /dev/null +++ b/Fengling.ServiceDiscovery.Kubernetes/src/Extensions/KubernetesServiceDiscoveryExtensions.cs @@ -0,0 +1,81 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using k8s; +using K8s = k8s.Kubernetes; + +namespace Fengling.ServiceDiscovery.Kubernetes.Extensions; + +/// +/// Kubernetes 服务发现扩展方法 +/// +public static class KubernetesServiceDiscoveryExtensions +{ + /// + /// 添加 Kubernetes 服务发现 + /// + public static IServiceCollection AddKubernetesServiceDiscovery( + this IServiceCollection services, + Action? configure = null) + { + var options = new KubernetesDiscoveryOptions(); + configure?.Invoke(options); + + services.AddSingleton(options); + + // 根据配置决定如何创建 Kubernetes 客户端 + if (options.UseInClusterConfig) + { + services.AddSingleton(sp => + { + var config = KubernetesClientConfiguration.InClusterConfig(); + return new K8s(config); + }); + } + else if (!string.IsNullOrEmpty(options.KubeConfigPath)) + { + services.AddSingleton(sp => + { + var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(options.KubeConfigPath); + return new K8s(config); + }); + } + else + { + services.AddSingleton(sp => + { + var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(); + return new K8s(config); + }); + } + + services.AddSingleton(); + + return services; + } +} + +/// +/// Kubernetes 发现配置选项 +/// +public class KubernetesDiscoveryOptions +{ + /// + /// Label 选择器,用于筛选需要发现的服务 + /// + public string LabelSelector { get; set; } = "app.kubernetes.io/managed-by=yarp"; + + /// + /// 命名空间,为空表示所有命名空间 + /// + public string? Namespace { get; set; } + + /// + /// 是否使用 In-Cluster 配置 (在 K8s Pod 内运行) + /// + public bool UseInClusterConfig { get; set; } + + /// + /// KubeConfig 文件路径 + /// + public string? KubeConfigPath { get; set; } +} diff --git a/Fengling.ServiceDiscovery.Kubernetes/src/KubernetesDiscoveryProvider.cs b/Fengling.ServiceDiscovery.Kubernetes/src/KubernetesDiscoveryProvider.cs new file mode 100644 index 0000000..5801e98 --- /dev/null +++ b/Fengling.ServiceDiscovery.Kubernetes/src/KubernetesDiscoveryProvider.cs @@ -0,0 +1,176 @@ +using System.Runtime.CompilerServices; +using k8s; +using k8s.Models; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Fengling.ServiceDiscovery.Kubernetes.Extensions; + +namespace Fengling.ServiceDiscovery.Kubernetes; + +/// +/// Kubernetes 服务发现提供商 +/// +public class KubernetesDiscoveryProvider : IServiceDiscoveryProvider +{ + private readonly IKubernetes _k8s; + private readonly KubernetesDiscoveryOptions _options; + private readonly ILogger _logger; + + public string ProviderName => "Kubernetes"; + + public KubernetesDiscoveryProvider( + IKubernetes k8s, + IOptions options, + ILogger logger) + { + _k8s = k8s; + _options = options.Value; + _logger = logger; + } + + public async Task> GetServicesAsync(CancellationToken ct = default) + { + try + { + var services = await _k8s.CoreV1.ListServiceForAllNamespacesAsync( + labelSelector: _options.LabelSelector, + fieldSelector: string.IsNullOrEmpty(_options.Namespace) ? null : $"metadata.namespace={_options.Namespace}", + cancellationToken: ct); + + return services.Items.Select(MapToService).ToList(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to get services from Kubernetes"); + return Array.Empty(); + } + } + + public async Task> GetInstancesAsync( + string serviceName, + string? namespace_ = null, + CancellationToken ct = default) + { + try + { + var ns = namespace_ ?? _options.Namespace ?? "default"; + var endpoints = await _k8s.CoreV1.ReadNamespacedEndpointsAsync(serviceName, ns, cancellationToken: ct); + + var instances = new List(); + + if (endpoints.Subsets != null) + { + foreach (var subset in endpoints.Subsets) + { + var port = subset.Ports?.FirstOrDefault()?.Port ?? 80; + + // Ready endpoints + if (subset.Addresses != null) + { + foreach (var addr in subset.Addresses) + { + instances.Add(new DiscoveredInstance + { + InstanceId = $"{serviceName}-{addr.TargetRef?.Name}", + PodName = addr.TargetRef?.Name, + PodIP = addr.Ip, + Address = $"http://{addr.Ip}:{port}", + Port = port, + IsHealthy = true, + HealthStatus = "Running" + }); + } + } + + // NotReady endpoints + if (subset.NotReadyAddresses != null) + { + foreach (var addr in subset.NotReadyAddresses) + { + instances.Add(new DiscoveredInstance + { + InstanceId = $"{serviceName}-{addr.TargetRef?.Name}", + PodName = addr.TargetRef?.Name, + PodIP = addr.Ip, + Address = $"http://{addr.Ip}:{port}", + Port = port, + IsHealthy = false, + HealthStatus = "NotReady" + }); + } + } + } + } + + return instances; + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to get instances for service {Service}", serviceName); + return Array.Empty(); + } + } + + public async IAsyncEnumerable WatchServicesAsync( + [EnumeratorCancellation] CancellationToken ct) + { + // Simplified implementation - watch services using the List operation with continuation + var services = await _k8s.CoreV1.ListServiceForAllNamespacesAsync( + labelSelector: _options.LabelSelector, + cancellationToken: ct); + + foreach (var service in services.Items) + { + var instances = await GetInstancesAsync(service.Metadata.Name, service.Metadata.Namespace(), ct); + yield return new ServiceDiscoveryChange + { + Type = ServiceDiscoveryChangeType.Added, + Service = MapToService(service), + Instances = instances + }; + } + } + + public async Task CheckHealthAsync( + string address, + string? healthPath = null, + CancellationToken ct = default) + { + var startTime = DateTime.UtcNow; + var url = $"{address.TrimEnd('/')}/{healthPath?.TrimStart('/') ?? "health"}"; + + try + { + using var client = new HttpClient { Timeout = TimeSpan.FromSeconds(5) }; + var response = await client.GetAsync(url, ct); + var elapsed = (long)(DateTime.UtcNow - startTime).TotalMilliseconds; + + if (response.IsSuccessStatusCode) + { + return HealthCheckResult.Healthy(elapsed); + } + + return HealthCheckResult.Unhealthy($"HTTP {response.StatusCode}", elapsed); + } + catch (Exception ex) + { + var elapsed = (long)(DateTime.UtcNow - startTime).TotalMilliseconds; + return HealthCheckResult.Unhealthy(ex.Message, elapsed); + } + } + + private DiscoveredService MapToService(V1Service service) + { + return new DiscoveredService + { + Name = service.Metadata.Name, + Namespace = service.Metadata.Namespace() ?? "default", + ClusterIP = service.Spec.ClusterIP, + ExternalIP = service.Spec.ExternalIPs?.FirstOrDefault(), + Ports = service.Spec.Ports?.Select(p => p.Port).ToList() ?? new List(), + Labels = service.Metadata.Labels != null ? new Dictionary(service.Metadata.Labels) : new Dictionary(), + Annotations = service.Metadata.Annotations != null ? new Dictionary(service.Metadata.Annotations) : new Dictionary(), + DiscoveredAt = DateTime.UtcNow + }; + } +} diff --git a/Fengling.ServiceDiscovery.Static/Fengling.ServiceDiscovery.Static.csproj b/Fengling.ServiceDiscovery.Static/Fengling.ServiceDiscovery.Static.csproj new file mode 100644 index 0000000..bbcab0b --- /dev/null +++ b/Fengling.ServiceDiscovery.Static/Fengling.ServiceDiscovery.Static.csproj @@ -0,0 +1,14 @@ + + + + net8.0 + enable + enable + Fengling.ServiceDiscovery.Static + + + + + + + diff --git a/Fengling.ServiceDiscovery.Static/src/Extensions/StaticServiceDiscoveryExtensions.cs b/Fengling.ServiceDiscovery.Static/src/Extensions/StaticServiceDiscoveryExtensions.cs new file mode 100644 index 0000000..3db21fc --- /dev/null +++ b/Fengling.ServiceDiscovery.Static/src/Extensions/StaticServiceDiscoveryExtensions.cs @@ -0,0 +1,88 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace Fengling.ServiceDiscovery.Static.Extensions; + +/// +/// 静态服务发现扩展方法 +/// +public static class StaticServiceDiscoveryExtensions +{ + /// + /// 添加静态服务发现 (用于本地调试) + /// + public static IServiceCollection AddStaticServiceDiscovery( + this IServiceCollection services, + Action configure) + { + var options = new StaticDiscoveryOptions(); + configure(options); + + services.AddSingleton(options); + services.AddSingleton(); + + return services; + } +} + +/// +/// 静态服务发现配置选项 +/// +public class StaticDiscoveryOptions +{ + /// + /// 静态服务列表 + /// + public List Services { get; set; } = new(); +} + +/// +/// 静态服务定义 +/// +public class StaticService +{ + /// + /// 服务名称 + /// + public required string Name { get; set; } + + /// + /// 命名空间 + /// + public string? Namespace { get; set; } + + /// + /// 服务地址 (不含端口) + /// + public string? Address { get; set; } + + /// + /// 端口列表 + /// + public List? Ports { get; set; } + + /// + /// 实例列表 + /// + public List? Instances { get; set; } +} + +/// +/// 静态实例定义 +/// +public class StaticInstance +{ + /// + /// 实例地址 + /// + public required string Address { get; set; } + + /// + /// 端口 + /// + public int? Port { get; set; } + + /// + /// 权重 + /// + public int? Weight { get; set; } +} diff --git a/Fengling.ServiceDiscovery.Static/src/StaticDiscoveryProvider.cs b/Fengling.ServiceDiscovery.Static/src/StaticDiscoveryProvider.cs new file mode 100644 index 0000000..c2f3a12 --- /dev/null +++ b/Fengling.ServiceDiscovery.Static/src/StaticDiscoveryProvider.cs @@ -0,0 +1,84 @@ +using System.Runtime.CompilerServices; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Fengling.ServiceDiscovery.Static.Extensions; + +namespace Fengling.ServiceDiscovery.Static; + +/// +/// 静态服务发现提供商 (用于本地调试) +/// +public class StaticDiscoveryProvider : IServiceDiscoveryProvider +{ + private readonly StaticDiscoveryOptions _options; + private readonly ILogger _logger; + + public string ProviderName => "Static"; + + public StaticDiscoveryProvider( + IOptions options, + ILogger logger) + { + _options = options.Value; + _logger = logger; + } + + public Task> GetServicesAsync(CancellationToken ct = default) + { + var services = _options.Services.Select(s => new DiscoveredService + { + Name = s.Name, + Namespace = s.Namespace ?? "default", + ClusterIP = s.Address?.Split(':').FirstOrDefault(), + Ports = s.Ports ?? new List { 80 }, + Labels = new Dictionary(), + Annotations = new Dictionary(), + DiscoveredAt = DateTime.UtcNow + }).ToList(); + + return Task.FromResult>(services); + } + + public Task> GetInstancesAsync( + string serviceName, + string? namespace_ = null, + CancellationToken ct = default) + { + var service = _options.Services.FirstOrDefault(s => s.Name == serviceName); + + if (service == null || service.Instances == null) + { + return Task.FromResult>( + new List()); + } + + var instances = service.Instances.Select((inst, index) => new DiscoveredInstance + { + InstanceId = $"{serviceName}-{index}", + Address = inst.Address, + Port = inst.Port ?? 80, + Weight = inst.Weight ?? 1, + IsHealthy = true, + HealthStatus = "AlwaysHealthy" + }).ToList(); + + return Task.FromResult>(instances); + } + + public async IAsyncEnumerable WatchServicesAsync( + [EnumeratorCancellation] CancellationToken ct) + { + // 静态配置不支持 Watch,返回空 + await Task.CompletedTask; + yield break; + } + + public Task CheckHealthAsync( + string address, + string? healthPath = null, + CancellationToken ct = default) + { + // 静态配置默认返回健康 + return Task.FromResult(HealthCheckResult.Healthy(0)); + } +}