feat: add ServiceDiscovery framework with Kubernetes, Consul and Static providers
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
commit
a6f6245807
@ -0,0 +1,18 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<RootNamespace>Fengling.ServiceDiscovery.Consul</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Consul" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Fengling.ServiceDiscovery.Core\Fengling.ServiceDiscovery.Core.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
111
Fengling.ServiceDiscovery.Consul/src/ConsulDiscoveryProvider.cs
Normal file
111
Fengling.ServiceDiscovery.Consul/src/ConsulDiscoveryProvider.cs
Normal file
@ -0,0 +1,111 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Consul;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Fengling.ServiceDiscovery.Consul.Extensions;
|
||||
|
||||
namespace Fengling.ServiceDiscovery.Consul;
|
||||
|
||||
/// <summary>
|
||||
/// Consul 服务发现提供商
|
||||
/// </summary>
|
||||
public class ConsulDiscoveryProvider : IServiceDiscoveryProvider
|
||||
{
|
||||
private readonly IConsulClient _consul;
|
||||
private readonly ConsulDiscoveryOptions _options;
|
||||
private readonly ILogger<ConsulDiscoveryProvider> _logger;
|
||||
|
||||
public string ProviderName => "Consul";
|
||||
|
||||
public ConsulDiscoveryProvider(
|
||||
IConsulClient consul,
|
||||
IOptions<ConsulDiscoveryOptions> options,
|
||||
ILogger<ConsulDiscoveryProvider> logger)
|
||||
{
|
||||
_consul = consul;
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<DiscoveredService>> GetServicesAsync(CancellationToken ct = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
var services = await _consul.Catalog.Services(ct: ct);
|
||||
|
||||
return services.Response.Select(kvp => new DiscoveredService
|
||||
{
|
||||
Name = kvp.Key,
|
||||
Namespace = _options.Datacenter ?? "dc1",
|
||||
Labels = kvp.Value?.ToDictionary(x => x, x => x) ?? new Dictionary<string, string>(),
|
||||
Ports = new List<int>(), // Consul 不直接提供端口,需要查每个服务
|
||||
DiscoveredAt = DateTime.UtcNow
|
||||
}).ToList();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to get services from Consul");
|
||||
return Array.Empty<DiscoveredService>();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<DiscoveredInstance>> GetInstancesAsync(
|
||||
string serviceName,
|
||||
string? namespace_ = null,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
var service = await _consul.Catalog.Service(serviceName, ct: ct);
|
||||
|
||||
return service.Response?.Select(s => new DiscoveredInstance
|
||||
{
|
||||
InstanceId = s.ServiceID,
|
||||
Address = $"http://{s.ServiceAddress}:{s.ServicePort}",
|
||||
Port = s.ServicePort,
|
||||
IsHealthy = true,
|
||||
HealthStatus = "Passing"
|
||||
}).ToList() ?? new List<DiscoveredInstance>();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to get instances for service {Service} from Consul", serviceName);
|
||||
return Array.Empty<DiscoveredInstance>();
|
||||
}
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<ServiceDiscoveryChange> WatchServicesAsync(
|
||||
[EnumeratorCancellation] CancellationToken ct)
|
||||
{
|
||||
var serviceKeys = await _consul.Catalog.Services(ct: ct);
|
||||
|
||||
// Consul 没有原生的 Watch 事件,需要轮询或使用 Agent.Check
|
||||
// 这里简化实现,返回空
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
|
||||
public async Task<HealthCheckResult> CheckHealthAsync(
|
||||
string address,
|
||||
string? healthPath = null,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var startTime = DateTime.UtcNow;
|
||||
|
||||
try
|
||||
{
|
||||
using var client = new HttpClient { Timeout = TimeSpan.FromSeconds(5) };
|
||||
var response = await client.GetAsync($"{address.TrimEnd('/')}/health", ct);
|
||||
var elapsed = (long)(DateTime.UtcNow - startTime).TotalMilliseconds;
|
||||
|
||||
return response.IsSuccessStatusCode
|
||||
? HealthCheckResult.Healthy(elapsed)
|
||||
: HealthCheckResult.Unhealthy($"HTTP {response.StatusCode}", elapsed);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var elapsed = (long)(DateTime.UtcNow - startTime).TotalMilliseconds;
|
||||
return HealthCheckResult.Unhealthy(ex.Message, elapsed);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,63 @@
|
||||
using Consul;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Fengling.ServiceDiscovery.Consul.Extensions;
|
||||
|
||||
/// <summary>
|
||||
/// Consul 服务发现扩展方法
|
||||
/// </summary>
|
||||
public static class ConsulServiceDiscoveryExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// 添加 Consul 服务发现
|
||||
/// </summary>
|
||||
public static IServiceCollection AddConsulServiceDiscovery(
|
||||
this IServiceCollection services,
|
||||
Action<ConsulDiscoveryOptions>? configure = null)
|
||||
{
|
||||
var options = new ConsulDiscoveryOptions();
|
||||
configure?.Invoke(options);
|
||||
|
||||
services.AddSingleton(options);
|
||||
|
||||
// 注册 Consul 客户端
|
||||
services.AddSingleton<IConsulClient>(sp =>
|
||||
{
|
||||
var opts = sp.GetRequiredService<ConsulDiscoveryOptions>();
|
||||
return new ConsulClient(config =>
|
||||
{
|
||||
config.Address = opts.Address;
|
||||
config.Datacenter = opts.Datacenter;
|
||||
if (!string.IsNullOrEmpty(opts.Token))
|
||||
{
|
||||
config.Token = opts.Token;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
services.AddSingleton<Fengling.ServiceDiscovery.IServiceDiscoveryProvider, ConsulDiscoveryProvider>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Consul 发现配置选项
|
||||
/// </summary>
|
||||
public class ConsulDiscoveryOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Consul 地址
|
||||
/// </summary>
|
||||
public Uri Address { get; set; } = new("http://localhost:8500");
|
||||
|
||||
/// <summary>
|
||||
/// 数据中心
|
||||
/// </summary>
|
||||
public string? Datacenter { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// ACL Token
|
||||
/// </summary>
|
||||
public string? Token { get; set; }
|
||||
}
|
||||
@ -0,0 +1,19 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<RootNamespace>Fengling.ServiceDiscovery</RootNamespace>
|
||||
<AssemblyName>Fengling.ServiceDiscovery.Core</AssemblyName>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting" />
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@ -0,0 +1,38 @@
|
||||
namespace Fengling.ServiceDiscovery;
|
||||
|
||||
/// <summary>
|
||||
/// 服务发现提供商
|
||||
/// </summary>
|
||||
public interface IServiceDiscoveryProvider
|
||||
{
|
||||
/// <summary>
|
||||
/// 提供商名称 (如: "Kubernetes", "Consul", "Static")
|
||||
/// </summary>
|
||||
string ProviderName { get; }
|
||||
|
||||
/// <summary>
|
||||
/// 获取所有服务
|
||||
/// </summary>
|
||||
Task<IReadOnlyList<DiscoveredService>> GetServicesAsync(CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// 获取服务实例
|
||||
/// </summary>
|
||||
Task<IReadOnlyList<DiscoveredInstance>> GetInstancesAsync(
|
||||
string serviceName,
|
||||
string? namespace_ = null,
|
||||
CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// 监听服务变化
|
||||
/// </summary>
|
||||
IAsyncEnumerable<ServiceDiscoveryChange> WatchServicesAsync(CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// 健康检查
|
||||
/// </summary>
|
||||
Task<HealthCheckResult> CheckHealthAsync(
|
||||
string address,
|
||||
string? healthPath = null,
|
||||
CancellationToken ct = default);
|
||||
}
|
||||
@ -0,0 +1,104 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Fengling.ServiceDiscovery.Extensions;
|
||||
|
||||
/// <summary>
|
||||
/// 服务发现扩展方法
|
||||
/// </summary>
|
||||
public static class ServiceCollectionExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// 添加服务发现
|
||||
/// </summary>
|
||||
public static IServiceCollection AddServiceDiscovery(
|
||||
this IServiceCollection services,
|
||||
Action<ServiceDiscoveryOptions>? configure = null)
|
||||
{
|
||||
var options = new ServiceDiscoveryOptions();
|
||||
configure?.Invoke(options);
|
||||
|
||||
services.AddSingleton(options);
|
||||
|
||||
// 如果启用了后台同步,注册后台服务
|
||||
if (options.EnableBackgroundSync)
|
||||
{
|
||||
services.AddHostedService<ServiceDiscoveryBackgroundSync>();
|
||||
}
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 服务发现配置选项
|
||||
/// </summary>
|
||||
public class ServiceDiscoveryOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// 启用后台同步
|
||||
/// </summary>
|
||||
public bool EnableBackgroundSync { get; set; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// 同步间隔
|
||||
/// </summary>
|
||||
public TimeSpan SyncInterval { get; set; } = TimeSpan.FromSeconds(30);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 后台同步服务 - 通过接口调度
|
||||
/// </summary>
|
||||
public class ServiceDiscoveryBackgroundSync : BackgroundService
|
||||
{
|
||||
private readonly ILogger<ServiceDiscoveryBackgroundSync> _logger;
|
||||
private readonly ServiceDiscoveryOptions _options;
|
||||
private readonly IEnumerable<IServiceDiscoveryProvider> _providers;
|
||||
|
||||
public ServiceDiscoveryBackgroundSync(
|
||||
IEnumerable<IServiceDiscoveryProvider> providers,
|
||||
ILogger<ServiceDiscoveryBackgroundSync> logger,
|
||||
IOptions<ServiceDiscoveryOptions> options)
|
||||
{
|
||||
_providers = providers;
|
||||
_logger = logger;
|
||||
_options = options.Value;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("Starting service discovery background sync");
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
foreach (var provider in _providers)
|
||||
{
|
||||
var services = await provider.GetServicesAsync(stoppingToken);
|
||||
_logger.LogDebug("[{Provider}] Discovered {Count} services",
|
||||
provider.ProviderName, services.Count);
|
||||
|
||||
foreach (var service in services)
|
||||
{
|
||||
var instances = await provider.GetInstancesAsync(
|
||||
service.Name,
|
||||
service.Namespace,
|
||||
stoppingToken);
|
||||
|
||||
_logger.LogDebug("[{Provider}] Service {Name} has {Count} instances",
|
||||
provider.ProviderName, service.Name, instances.Count);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error during service discovery sync");
|
||||
}
|
||||
|
||||
await Task.Delay(_options.SyncInterval, stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
namespace Fengling.ServiceDiscovery;
|
||||
|
||||
/// <summary>
|
||||
/// 发现的服务实例信息
|
||||
/// </summary>
|
||||
public record DiscoveredInstance
|
||||
{
|
||||
/// <summary>
|
||||
/// 实例 ID
|
||||
/// </summary>
|
||||
public required string InstanceId { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Pod 名称
|
||||
/// </summary>
|
||||
public string? PodName { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Pod IP
|
||||
/// </summary>
|
||||
public string? PodIP { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 服务地址 (http://ip:port)
|
||||
/// </summary>
|
||||
public required string Address { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 权重
|
||||
/// </summary>
|
||||
public int Weight { get; init; } = 1;
|
||||
|
||||
/// <summary>
|
||||
/// 是否健康
|
||||
/// </summary>
|
||||
public bool IsHealthy { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 健康状态描述
|
||||
/// </summary>
|
||||
public string? HealthStatus { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 实例端口
|
||||
/// </summary>
|
||||
public int Port { get; init; }
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
namespace Fengling.ServiceDiscovery;
|
||||
|
||||
/// <summary>
|
||||
/// 发现的服务信息
|
||||
/// </summary>
|
||||
public record DiscoveredService
|
||||
{
|
||||
/// <summary>
|
||||
/// 服务名称
|
||||
/// </summary>
|
||||
public required string Name { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 命名空间
|
||||
/// </summary>
|
||||
public required string Namespace { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 集群内部 IP
|
||||
/// </summary>
|
||||
public string? ClusterIP { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 外部 IP
|
||||
/// </summary>
|
||||
public string? ExternalIP { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 端口列表
|
||||
/// </summary>
|
||||
public IReadOnlyList<int> Ports { get; init; } = Array.Empty<int>();
|
||||
|
||||
/// <summary>
|
||||
/// 标签
|
||||
/// </summary>
|
||||
public IReadOnlyDictionary<string, string> Labels { get; init; } = new Dictionary<string, string>();
|
||||
|
||||
/// <summary>
|
||||
/// 注解
|
||||
/// </summary>
|
||||
public IReadOnlyDictionary<string, string> Annotations { get; init; } = new Dictionary<string, string>();
|
||||
|
||||
/// <summary>
|
||||
/// 发现时间
|
||||
/// </summary>
|
||||
public DateTime DiscoveredAt { get; init; } = DateTime.UtcNow;
|
||||
|
||||
/// <summary>
|
||||
/// 是否已分配路由
|
||||
/// </summary>
|
||||
public bool IsAssigned { get; set; }
|
||||
}
|
||||
@ -0,0 +1,49 @@
|
||||
namespace Fengling.ServiceDiscovery;
|
||||
|
||||
/// <summary>
|
||||
/// 健康检查结果
|
||||
/// </summary>
|
||||
public record HealthCheckResult
|
||||
{
|
||||
/// <summary>
|
||||
/// 是否健康
|
||||
/// </summary>
|
||||
public required bool IsHealthy { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 响应时间 (毫秒)
|
||||
/// </summary>
|
||||
public long ResponseTimeMs { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 状态描述
|
||||
/// </summary>
|
||||
public string? Status { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 错误信息
|
||||
/// </summary>
|
||||
public string? Error { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 检查时间
|
||||
/// </summary>
|
||||
public DateTime CheckedAt { get; init; } = DateTime.UtcNow;
|
||||
|
||||
public static HealthCheckResult Healthy(long responseTimeMs = 0)
|
||||
=> new()
|
||||
{
|
||||
IsHealthy = true,
|
||||
ResponseTimeMs = responseTimeMs,
|
||||
Status = "Healthy"
|
||||
};
|
||||
|
||||
public static HealthCheckResult Unhealthy(string error, long responseTimeMs = 0)
|
||||
=> new()
|
||||
{
|
||||
IsHealthy = false,
|
||||
ResponseTimeMs = responseTimeMs,
|
||||
Status = "Unhealthy",
|
||||
Error = error
|
||||
};
|
||||
}
|
||||
@ -0,0 +1,43 @@
|
||||
namespace Fengling.ServiceDiscovery;
|
||||
|
||||
/// <summary>
|
||||
/// 服务发现变更事件
|
||||
/// </summary>
|
||||
public record ServiceDiscoveryChange
|
||||
{
|
||||
/// <summary>
|
||||
/// 变更类型
|
||||
/// </summary>
|
||||
public required ServiceDiscoveryChangeType Type { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 变更的服务
|
||||
/// </summary>
|
||||
public required DiscoveredService Service { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// 关联的实例
|
||||
/// </summary>
|
||||
public IReadOnlyList<DiscoveredInstance>? Instances { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 服务发现变更类型
|
||||
/// </summary>
|
||||
public enum ServiceDiscoveryChangeType
|
||||
{
|
||||
/// <summary>
|
||||
/// 新增
|
||||
/// </summary>
|
||||
Added,
|
||||
|
||||
/// <summary>
|
||||
/// 修改
|
||||
/// </summary>
|
||||
Modified,
|
||||
|
||||
/// <summary>
|
||||
/// 删除
|
||||
/// </summary>
|
||||
Deleted
|
||||
}
|
||||
@ -0,0 +1,19 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<RootNamespace>Fengling.ServiceDiscovery.Kubernetes</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="KubernetesClient" />
|
||||
<PackageReference Include="Microsoft.Extensions.Http" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Fengling.ServiceDiscovery.Core\Fengling.ServiceDiscovery.Core.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@ -0,0 +1,81 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using k8s;
|
||||
using K8s = k8s.Kubernetes;
|
||||
|
||||
namespace Fengling.ServiceDiscovery.Kubernetes.Extensions;
|
||||
|
||||
/// <summary>
|
||||
/// Kubernetes 服务发现扩展方法
|
||||
/// </summary>
|
||||
public static class KubernetesServiceDiscoveryExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// 添加 Kubernetes 服务发现
|
||||
/// </summary>
|
||||
public static IServiceCollection AddKubernetesServiceDiscovery(
|
||||
this IServiceCollection services,
|
||||
Action<KubernetesDiscoveryOptions>? configure = null)
|
||||
{
|
||||
var options = new KubernetesDiscoveryOptions();
|
||||
configure?.Invoke(options);
|
||||
|
||||
services.AddSingleton(options);
|
||||
|
||||
// 根据配置决定如何创建 Kubernetes 客户端
|
||||
if (options.UseInClusterConfig)
|
||||
{
|
||||
services.AddSingleton<IKubernetes>(sp =>
|
||||
{
|
||||
var config = KubernetesClientConfiguration.InClusterConfig();
|
||||
return new K8s(config);
|
||||
});
|
||||
}
|
||||
else if (!string.IsNullOrEmpty(options.KubeConfigPath))
|
||||
{
|
||||
services.AddSingleton<IKubernetes>(sp =>
|
||||
{
|
||||
var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(options.KubeConfigPath);
|
||||
return new K8s(config);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
services.AddSingleton<IKubernetes>(sp =>
|
||||
{
|
||||
var config = KubernetesClientConfiguration.BuildConfigFromConfigFile();
|
||||
return new K8s(config);
|
||||
});
|
||||
}
|
||||
|
||||
services.AddSingleton<IServiceDiscoveryProvider, KubernetesDiscoveryProvider>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Kubernetes 发现配置选项
|
||||
/// </summary>
|
||||
public class KubernetesDiscoveryOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Label 选择器,用于筛选需要发现的服务
|
||||
/// </summary>
|
||||
public string LabelSelector { get; set; } = "app.kubernetes.io/managed-by=yarp";
|
||||
|
||||
/// <summary>
|
||||
/// 命名空间,为空表示所有命名空间
|
||||
/// </summary>
|
||||
public string? Namespace { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 是否使用 In-Cluster 配置 (在 K8s Pod 内运行)
|
||||
/// </summary>
|
||||
public bool UseInClusterConfig { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// KubeConfig 文件路径
|
||||
/// </summary>
|
||||
public string? KubeConfigPath { get; set; }
|
||||
}
|
||||
@ -0,0 +1,176 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using k8s;
|
||||
using k8s.Models;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Fengling.ServiceDiscovery.Kubernetes.Extensions;
|
||||
|
||||
namespace Fengling.ServiceDiscovery.Kubernetes;
|
||||
|
||||
/// <summary>
|
||||
/// Kubernetes 服务发现提供商
|
||||
/// </summary>
|
||||
public class KubernetesDiscoveryProvider : IServiceDiscoveryProvider
|
||||
{
|
||||
private readonly IKubernetes _k8s;
|
||||
private readonly KubernetesDiscoveryOptions _options;
|
||||
private readonly ILogger<KubernetesDiscoveryProvider> _logger;
|
||||
|
||||
public string ProviderName => "Kubernetes";
|
||||
|
||||
public KubernetesDiscoveryProvider(
|
||||
IKubernetes k8s,
|
||||
IOptions<KubernetesDiscoveryOptions> options,
|
||||
ILogger<KubernetesDiscoveryProvider> logger)
|
||||
{
|
||||
_k8s = k8s;
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<DiscoveredService>> GetServicesAsync(CancellationToken ct = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
var services = await _k8s.CoreV1.ListServiceForAllNamespacesAsync(
|
||||
labelSelector: _options.LabelSelector,
|
||||
fieldSelector: string.IsNullOrEmpty(_options.Namespace) ? null : $"metadata.namespace={_options.Namespace}",
|
||||
cancellationToken: ct);
|
||||
|
||||
return services.Items.Select(MapToService).ToList();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to get services from Kubernetes");
|
||||
return Array.Empty<DiscoveredService>();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<DiscoveredInstance>> GetInstancesAsync(
|
||||
string serviceName,
|
||||
string? namespace_ = null,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
var ns = namespace_ ?? _options.Namespace ?? "default";
|
||||
var endpoints = await _k8s.CoreV1.ReadNamespacedEndpointsAsync(serviceName, ns, cancellationToken: ct);
|
||||
|
||||
var instances = new List<DiscoveredInstance>();
|
||||
|
||||
if (endpoints.Subsets != null)
|
||||
{
|
||||
foreach (var subset in endpoints.Subsets)
|
||||
{
|
||||
var port = subset.Ports?.FirstOrDefault()?.Port ?? 80;
|
||||
|
||||
// Ready endpoints
|
||||
if (subset.Addresses != null)
|
||||
{
|
||||
foreach (var addr in subset.Addresses)
|
||||
{
|
||||
instances.Add(new DiscoveredInstance
|
||||
{
|
||||
InstanceId = $"{serviceName}-{addr.TargetRef?.Name}",
|
||||
PodName = addr.TargetRef?.Name,
|
||||
PodIP = addr.Ip,
|
||||
Address = $"http://{addr.Ip}:{port}",
|
||||
Port = port,
|
||||
IsHealthy = true,
|
||||
HealthStatus = "Running"
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// NotReady endpoints
|
||||
if (subset.NotReadyAddresses != null)
|
||||
{
|
||||
foreach (var addr in subset.NotReadyAddresses)
|
||||
{
|
||||
instances.Add(new DiscoveredInstance
|
||||
{
|
||||
InstanceId = $"{serviceName}-{addr.TargetRef?.Name}",
|
||||
PodName = addr.TargetRef?.Name,
|
||||
PodIP = addr.Ip,
|
||||
Address = $"http://{addr.Ip}:{port}",
|
||||
Port = port,
|
||||
IsHealthy = false,
|
||||
HealthStatus = "NotReady"
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return instances;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to get instances for service {Service}", serviceName);
|
||||
return Array.Empty<DiscoveredInstance>();
|
||||
}
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<ServiceDiscoveryChange> WatchServicesAsync(
|
||||
[EnumeratorCancellation] CancellationToken ct)
|
||||
{
|
||||
// Simplified implementation - watch services using the List operation with continuation
|
||||
var services = await _k8s.CoreV1.ListServiceForAllNamespacesAsync(
|
||||
labelSelector: _options.LabelSelector,
|
||||
cancellationToken: ct);
|
||||
|
||||
foreach (var service in services.Items)
|
||||
{
|
||||
var instances = await GetInstancesAsync(service.Metadata.Name, service.Metadata.Namespace(), ct);
|
||||
yield return new ServiceDiscoveryChange
|
||||
{
|
||||
Type = ServiceDiscoveryChangeType.Added,
|
||||
Service = MapToService(service),
|
||||
Instances = instances
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<HealthCheckResult> CheckHealthAsync(
|
||||
string address,
|
||||
string? healthPath = null,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var startTime = DateTime.UtcNow;
|
||||
var url = $"{address.TrimEnd('/')}/{healthPath?.TrimStart('/') ?? "health"}";
|
||||
|
||||
try
|
||||
{
|
||||
using var client = new HttpClient { Timeout = TimeSpan.FromSeconds(5) };
|
||||
var response = await client.GetAsync(url, ct);
|
||||
var elapsed = (long)(DateTime.UtcNow - startTime).TotalMilliseconds;
|
||||
|
||||
if (response.IsSuccessStatusCode)
|
||||
{
|
||||
return HealthCheckResult.Healthy(elapsed);
|
||||
}
|
||||
|
||||
return HealthCheckResult.Unhealthy($"HTTP {response.StatusCode}", elapsed);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var elapsed = (long)(DateTime.UtcNow - startTime).TotalMilliseconds;
|
||||
return HealthCheckResult.Unhealthy(ex.Message, elapsed);
|
||||
}
|
||||
}
|
||||
|
||||
private DiscoveredService MapToService(V1Service service)
|
||||
{
|
||||
return new DiscoveredService
|
||||
{
|
||||
Name = service.Metadata.Name,
|
||||
Namespace = service.Metadata.Namespace() ?? "default",
|
||||
ClusterIP = service.Spec.ClusterIP,
|
||||
ExternalIP = service.Spec.ExternalIPs?.FirstOrDefault(),
|
||||
Ports = service.Spec.Ports?.Select(p => p.Port).ToList() ?? new List<int>(),
|
||||
Labels = service.Metadata.Labels != null ? new Dictionary<string, string>(service.Metadata.Labels) : new Dictionary<string, string>(),
|
||||
Annotations = service.Metadata.Annotations != null ? new Dictionary<string, string>(service.Metadata.Annotations) : new Dictionary<string, string>(),
|
||||
DiscoveredAt = DateTime.UtcNow
|
||||
};
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,14 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<RootNamespace>Fengling.ServiceDiscovery.Static</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Fengling.ServiceDiscovery.Core\Fengling.ServiceDiscovery.Core.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@ -0,0 +1,88 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Fengling.ServiceDiscovery.Static.Extensions;
|
||||
|
||||
/// <summary>
|
||||
/// 静态服务发现扩展方法
|
||||
/// </summary>
|
||||
public static class StaticServiceDiscoveryExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// 添加静态服务发现 (用于本地调试)
|
||||
/// </summary>
|
||||
public static IServiceCollection AddStaticServiceDiscovery(
|
||||
this IServiceCollection services,
|
||||
Action<StaticDiscoveryOptions> configure)
|
||||
{
|
||||
var options = new StaticDiscoveryOptions();
|
||||
configure(options);
|
||||
|
||||
services.AddSingleton(options);
|
||||
services.AddSingleton<Fengling.ServiceDiscovery.IServiceDiscoveryProvider, StaticDiscoveryProvider>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 静态服务发现配置选项
|
||||
/// </summary>
|
||||
public class StaticDiscoveryOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// 静态服务列表
|
||||
/// </summary>
|
||||
public List<StaticService> Services { get; set; } = new();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 静态服务定义
|
||||
/// </summary>
|
||||
public class StaticService
|
||||
{
|
||||
/// <summary>
|
||||
/// 服务名称
|
||||
/// </summary>
|
||||
public required string Name { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 命名空间
|
||||
/// </summary>
|
||||
public string? Namespace { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 服务地址 (不含端口)
|
||||
/// </summary>
|
||||
public string? Address { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 端口列表
|
||||
/// </summary>
|
||||
public List<int>? Ports { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 实例列表
|
||||
/// </summary>
|
||||
public List<StaticInstance>? Instances { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 静态实例定义
|
||||
/// </summary>
|
||||
public class StaticInstance
|
||||
{
|
||||
/// <summary>
|
||||
/// 实例地址
|
||||
/// </summary>
|
||||
public required string Address { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 端口
|
||||
/// </summary>
|
||||
public int? Port { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 权重
|
||||
/// </summary>
|
||||
public int? Weight { get; set; }
|
||||
}
|
||||
@ -0,0 +1,84 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Fengling.ServiceDiscovery.Static.Extensions;
|
||||
|
||||
namespace Fengling.ServiceDiscovery.Static;
|
||||
|
||||
/// <summary>
|
||||
/// 静态服务发现提供商 (用于本地调试)
|
||||
/// </summary>
|
||||
public class StaticDiscoveryProvider : IServiceDiscoveryProvider
|
||||
{
|
||||
private readonly StaticDiscoveryOptions _options;
|
||||
private readonly ILogger<StaticDiscoveryProvider> _logger;
|
||||
|
||||
public string ProviderName => "Static";
|
||||
|
||||
public StaticDiscoveryProvider(
|
||||
IOptions<StaticDiscoveryOptions> options,
|
||||
ILogger<StaticDiscoveryProvider> logger)
|
||||
{
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<DiscoveredService>> GetServicesAsync(CancellationToken ct = default)
|
||||
{
|
||||
var services = _options.Services.Select(s => new DiscoveredService
|
||||
{
|
||||
Name = s.Name,
|
||||
Namespace = s.Namespace ?? "default",
|
||||
ClusterIP = s.Address?.Split(':').FirstOrDefault(),
|
||||
Ports = s.Ports ?? new List<int> { 80 },
|
||||
Labels = new Dictionary<string, string>(),
|
||||
Annotations = new Dictionary<string, string>(),
|
||||
DiscoveredAt = DateTime.UtcNow
|
||||
}).ToList();
|
||||
|
||||
return Task.FromResult<IReadOnlyList<DiscoveredService>>(services);
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<DiscoveredInstance>> GetInstancesAsync(
|
||||
string serviceName,
|
||||
string? namespace_ = null,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var service = _options.Services.FirstOrDefault(s => s.Name == serviceName);
|
||||
|
||||
if (service == null || service.Instances == null)
|
||||
{
|
||||
return Task.FromResult<IReadOnlyList<DiscoveredInstance>>(
|
||||
new List<DiscoveredInstance>());
|
||||
}
|
||||
|
||||
var instances = service.Instances.Select((inst, index) => new DiscoveredInstance
|
||||
{
|
||||
InstanceId = $"{serviceName}-{index}",
|
||||
Address = inst.Address,
|
||||
Port = inst.Port ?? 80,
|
||||
Weight = inst.Weight ?? 1,
|
||||
IsHealthy = true,
|
||||
HealthStatus = "AlwaysHealthy"
|
||||
}).ToList();
|
||||
|
||||
return Task.FromResult<IReadOnlyList<DiscoveredInstance>>(instances);
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<ServiceDiscoveryChange> WatchServicesAsync(
|
||||
[EnumeratorCancellation] CancellationToken ct)
|
||||
{
|
||||
// 静态配置不支持 Watch,返回空
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
|
||||
public Task<HealthCheckResult> CheckHealthAsync(
|
||||
string address,
|
||||
string? healthPath = null,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
// 静态配置默认返回健康
|
||||
return Task.FromResult(HealthCheckResult.Healthy(0));
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user