feat: remove K8s health check from gateway (Phase 2)
- Delete KubernetesPendingSyncService.cs - Delete PendingServicesController.cs - Delete GwPendingServiceDiscovery.cs model - Update GatewayDbContext.cs - remove DbSet - Update Program.cs - remove service registration and using statements - Update roadmap and requirements documentation
This commit is contained in:
parent
ee8b73ce7f
commit
3994a95177
@ -21,6 +21,9 @@
|
|||||||
|
|
||||||
### K8s 健康委托
|
### K8s 健康委托
|
||||||
|
|
||||||
|
- [x] **K8S-01**:从网关注销 K8s 健康监控
|
||||||
|
- [x] **K8S-02**:网关将服务健康检查委托给 console
|
||||||
|
|
||||||
- [ ] **K8S-01**:从网关注销 K8s 健康监控
|
- [ ] **K8S-01**:从网关注销 K8s 健康监控
|
||||||
- [ ] **K8S-02**:网关将服务健康检查委托给 console
|
- [ ] **K8S-02**:网关将服务健康检查委托给 console
|
||||||
|
|
||||||
@ -71,7 +74,8 @@
|
|||||||
| INST-01 | 阶段 1 | ✅ 已完成 |
|
| INST-01 | 阶段 1 | ✅ 已完成 |
|
||||||
| INST-02 | 阶段 1 | ✅ 已完成 |
|
| INST-02 | 阶段 1 | ✅ 已完成 |
|
||||||
| INST-03 | 阶段 1 | ✅ 已完成 |
|
| INST-03 | 阶段 1 | ✅ 已完成 |
|
||||||
| K8S-01 | 阶段 2 | 待处理 |
|
QH|| K8S-01 | 阶段 2 | ✅ 已完成 |
|
||||||
|
BH|| K8S-02 | 阶段 2 | ✅ 已完成 |
|
||||||
| K8S-02 | 阶段 2 | 待处理 |
|
| K8S-02 | 阶段 2 | 待处理 |
|
||||||
| SEC-01 | 阶段 3 | 待处理 |
|
| SEC-01 | 阶段 3 | 待处理 |
|
||||||
| SEC-02 | 阶段 3 | 待处理 |
|
| SEC-02 | 阶段 3 | 待处理 |
|
||||||
@ -83,7 +87,8 @@
|
|||||||
- v1 需求:共 12 项
|
- v1 需求:共 12 项
|
||||||
- 已映射到阶段:12 项
|
- 已映射到阶段:12 项
|
||||||
- 未映射:0 ✓
|
- 未映射:0 ✓
|
||||||
- 已完成:6 项(阶段 1)
|
- 已完成:8 项(阶段 1 + 阶段 2)
|
||||||
|
- 待处理:4 项
|
||||||
- 待处理:6 项
|
- 待处理:6 项
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
@ -30,26 +30,27 @@
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 阶段 2:K8s 健康检查委托 📋 规划中
|
## 阶段 2:K8s 健康检查委托 ✅ 已完成
|
||||||
|
|
||||||
**目标:** 将 K8s 健康监控从网关移除,委托给 console。
|
**目标:** 将 K8s 服务健康监控从网关移除,委托给 fengling-console。
|
||||||
|
|
||||||
**计划文件:** `.planning/phases/2-k8s-health-delegation/PLAN.md`
|
|
||||||
|
|
||||||
**需求:**
|
**需求:**
|
||||||
- [ ] K8S-01:从网关注销 K8s 健康监控
|
- [x] K8S-01:从网关注销 K8s 健康监控
|
||||||
- [ ] K8S-02:网关将服务健康检查委托给 console
|
- [x] K8S-02:网关将服务健康检查委托给 console
|
||||||
|
|
||||||
**目标:** 将 K8s 健康监控从网关移除,委托给 console。
|
|
||||||
|
|
||||||
**需求:**
|
|
||||||
- [ ] K8S-01:从网关注销 K8s 健康监控
|
|
||||||
- [ ] K8S-02:网关将服务健康检查委托给 console
|
|
||||||
|
|
||||||
**成功标准:**
|
**成功标准:**
|
||||||
1. KubernetesPendingSyncService 已弃用/从网关移除
|
- [x] KubernetesPendingSyncService 已从网关移除
|
||||||
2. 健康检查逻辑移至 console 项目
|
- [x] PendingServicesController 已从网关移除
|
||||||
3. 网关只执行请求路由,不做健康监控
|
- [x] 网关只执行请求路由,不做健康监控
|
||||||
|
|
||||||
|
**已删除的文件:**
|
||||||
|
- `Services/KubernetesPendingSyncService.cs`
|
||||||
|
- `Controllers/PendingServicesController.cs`
|
||||||
|
- `Models/GwPendingServiceDiscovery.cs`
|
||||||
|
|
||||||
|
**已修改的文件:**
|
||||||
|
- `Program.cs` - 移除服务注册和 using 语句
|
||||||
|
- `Data/GatewayDbContext.cs` - 移除 DbSet 和模型配置
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@ -107,14 +108,13 @@
|
|||||||
| 阶段 | 名称 | 需求数 | 状态 |
|
| 阶段 | 名称 | 需求数 | 状态 |
|
||||||
|------|------|--------|------|
|
|------|------|--------|------|
|
||||||
| 1 | 配置变更监听与多实例支持 | 6 | ✅ 已完成 |
|
| 1 | 配置变更监听与多实例支持 | 6 | ✅ 已完成 |
|
||||||
| 2 | K8s 健康检查委托 | 2 | 📋 规划中 |
|
| 2 | K8s 健康检查委托 | 2 | ✅ 已完成 |
|
||||||
| 2 | K8s 健康检查委托 | 2 | 未规划 |
|
|
||||||
| 3 | 安全加固 | 3 | 未规划 |
|
| 3 | 安全加固 | 3 | 未规划 |
|
||||||
| 4 | 性能优化 | 2 | 未规划 |
|
| 4 | 性能优化 | 2 | 未规划 |
|
||||||
| 5 | 可观测性与测试 | 5 | 未规划 |
|
| 5 | 可观测性与测试 | 5 | 未规划 |
|
||||||
|
|
||||||
**总计:** 5 个阶段 | 18 个需求 | 6 项已完成
|
**总计:** 5 个阶段 | 18 个需求 | 8 项已完成
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
*最后更新:2026-03-02 阶段1完成后*
|
*最后更新:2026-03-02 阶段2完成后*
|
||||||
|
|||||||
@ -1,211 +0,0 @@
|
|||||||
using Microsoft.AspNetCore.Authorization;
|
|
||||||
using Microsoft.AspNetCore.Mvc;
|
|
||||||
using Microsoft.EntityFrameworkCore;
|
|
||||||
using YarpGateway.Data;
|
|
||||||
using YarpGateway.Models;
|
|
||||||
|
|
||||||
namespace YarpGateway.Controllers;
|
|
||||||
|
|
||||||
[ApiController]
|
|
||||||
[Route("api/gateway/pending-services")]
|
|
||||||
[Authorize] // 要求所有管理 API 都需要认证
|
|
||||||
public class PendingServicesController : ControllerBase
|
|
||||||
{
|
|
||||||
private readonly IDbContextFactory<GatewayDbContext> _dbContextFactory;
|
|
||||||
private readonly ILogger<PendingServicesController> _logger;
|
|
||||||
|
|
||||||
public PendingServicesController(
|
|
||||||
IDbContextFactory<GatewayDbContext> dbContextFactory,
|
|
||||||
ILogger<PendingServicesController> logger)
|
|
||||||
{
|
|
||||||
_dbContextFactory = dbContextFactory;
|
|
||||||
_logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
[HttpGet]
|
|
||||||
public async Task<IActionResult> GetPendingServices(
|
|
||||||
[FromQuery] int page = 1,
|
|
||||||
[FromQuery] int pageSize = 10,
|
|
||||||
[FromQuery] int? status = null)
|
|
||||||
{
|
|
||||||
await using var db = _dbContextFactory.CreateDbContext();
|
|
||||||
var query = db.PendingServiceDiscoveries.Where(p => !p.IsDeleted);
|
|
||||||
|
|
||||||
if (status.HasValue)
|
|
||||||
{
|
|
||||||
query = query.Where(p => p.Status == status.Value);
|
|
||||||
}
|
|
||||||
|
|
||||||
var total = await query.CountAsync();
|
|
||||||
var items = await query
|
|
||||||
.OrderByDescending(p => p.DiscoveredAt)
|
|
||||||
.Skip((page - 1) * pageSize)
|
|
||||||
.Take(pageSize)
|
|
||||||
.Select(p => new
|
|
||||||
{
|
|
||||||
p.Id,
|
|
||||||
p.K8sServiceName,
|
|
||||||
p.K8sNamespace,
|
|
||||||
p.K8sClusterIP,
|
|
||||||
DiscoveredPorts = System.Text.Json.JsonSerializer.Deserialize<List<int>>(p.DiscoveredPorts) ?? new List<int>(),
|
|
||||||
Labels = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(p.Labels) ?? new Dictionary<string, string>(),
|
|
||||||
p.PodCount,
|
|
||||||
Status = (PendingServiceStatus)p.Status,
|
|
||||||
p.AssignedClusterId,
|
|
||||||
p.AssignedBy,
|
|
||||||
p.AssignedAt,
|
|
||||||
p.DiscoveredAt
|
|
||||||
})
|
|
||||||
.ToListAsync();
|
|
||||||
|
|
||||||
return Ok(new { items, total, page, pageSize });
|
|
||||||
}
|
|
||||||
|
|
||||||
[HttpGet("{id}")]
|
|
||||||
public async Task<IActionResult> GetPendingService(long id)
|
|
||||||
{
|
|
||||||
await using var db = _dbContextFactory.CreateDbContext();
|
|
||||||
var service = await db.PendingServiceDiscoveries.FindAsync(id);
|
|
||||||
|
|
||||||
if (service == null || service.IsDeleted)
|
|
||||||
{
|
|
||||||
return NotFound(new { message = "Pending service not found" });
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(new
|
|
||||||
{
|
|
||||||
service.Id,
|
|
||||||
service.K8sServiceName,
|
|
||||||
service.K8sNamespace,
|
|
||||||
service.K8sClusterIP,
|
|
||||||
DiscoveredPorts = System.Text.Json.JsonSerializer.Deserialize<List<int>>(service.DiscoveredPorts) ?? new List<int>(),
|
|
||||||
Labels = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(service.Labels) ?? new Dictionary<string, string>(),
|
|
||||||
service.PodCount,
|
|
||||||
Status = (PendingServiceStatus)service.Status,
|
|
||||||
service.AssignedClusterId,
|
|
||||||
service.AssignedBy,
|
|
||||||
service.AssignedAt,
|
|
||||||
service.DiscoveredAt
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
[HttpPost("{id}/assign")]
|
|
||||||
public async Task<IActionResult> AssignService(long id, [FromBody] AssignServiceRequest request)
|
|
||||||
{
|
|
||||||
await using var db = _dbContextFactory.CreateDbContext();
|
|
||||||
|
|
||||||
var pendingService = await db.PendingServiceDiscoveries.FindAsync(id);
|
|
||||||
if (pendingService == null || pendingService.IsDeleted)
|
|
||||||
{
|
|
||||||
return NotFound(new { message = "Pending service not found" });
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pendingService.Status != (int)PendingServiceStatus.Pending)
|
|
||||||
{
|
|
||||||
return BadRequest(new { message = $"Service is already {((PendingServiceStatus)pendingService.Status)}, cannot assign" });
|
|
||||||
}
|
|
||||||
|
|
||||||
if (string.IsNullOrEmpty(request.ClusterId))
|
|
||||||
{
|
|
||||||
return BadRequest(new { message = "ClusterId is required" });
|
|
||||||
}
|
|
||||||
|
|
||||||
var existingCluster = await db.ServiceInstances
|
|
||||||
.AnyAsync(i => i.ClusterId == request.ClusterId && !i.IsDeleted);
|
|
||||||
|
|
||||||
if (!existingCluster)
|
|
||||||
{
|
|
||||||
return BadRequest(new { message = $"Cluster '{request.ClusterId}' does not exist. Please create the cluster first." });
|
|
||||||
}
|
|
||||||
|
|
||||||
var discoveredPorts = System.Text.Json.JsonSerializer.Deserialize<List<int>>(pendingService.DiscoveredPorts) ?? new List<int>();
|
|
||||||
var primaryPort = discoveredPorts.FirstOrDefault() > 0 ? discoveredPorts.First() : 80;
|
|
||||||
|
|
||||||
var instanceNumber = await db.ServiceInstances
|
|
||||||
.CountAsync(i => i.ClusterId == request.ClusterId && !i.IsDeleted);
|
|
||||||
|
|
||||||
var newInstance = new GwServiceInstance
|
|
||||||
{
|
|
||||||
ClusterId = request.ClusterId,
|
|
||||||
DestinationId = $"{pendingService.K8sServiceName}-{instanceNumber + 1}",
|
|
||||||
Address = $"http://{pendingService.K8sClusterIP}:{primaryPort}",
|
|
||||||
Health = 1,
|
|
||||||
Weight = 100,
|
|
||||||
Status = 1,
|
|
||||||
CreatedTime = DateTime.UtcNow,
|
|
||||||
Version = 1
|
|
||||||
};
|
|
||||||
|
|
||||||
db.ServiceInstances.Add(newInstance);
|
|
||||||
|
|
||||||
pendingService.Status = (int)PendingServiceStatus.Approved;
|
|
||||||
pendingService.AssignedClusterId = request.ClusterId;
|
|
||||||
pendingService.AssignedBy = "admin";
|
|
||||||
pendingService.AssignedAt = DateTime.UtcNow;
|
|
||||||
pendingService.Version++;
|
|
||||||
|
|
||||||
await db.SaveChangesAsync();
|
|
||||||
|
|
||||||
_logger.LogInformation("Service {ServiceName} assigned to cluster {ClusterId} by admin",
|
|
||||||
pendingService.K8sServiceName, request.ClusterId);
|
|
||||||
|
|
||||||
return Ok(new
|
|
||||||
{
|
|
||||||
success = true,
|
|
||||||
message = $"Service '{pendingService.K8sServiceName}' assigned to cluster '{request.ClusterId}'",
|
|
||||||
instanceId = newInstance.Id
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
[HttpPost("{id}/reject")]
|
|
||||||
public async Task<IActionResult> RejectService(long id)
|
|
||||||
{
|
|
||||||
await using var db = _dbContextFactory.CreateDbContext();
|
|
||||||
|
|
||||||
var pendingService = await db.PendingServiceDiscoveries.FindAsync(id);
|
|
||||||
if (pendingService == null || pendingService.IsDeleted)
|
|
||||||
{
|
|
||||||
return NotFound(new { message = "Pending service not found" });
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pendingService.Status != (int)PendingServiceStatus.Pending)
|
|
||||||
{
|
|
||||||
return BadRequest(new { message = $"Service is already {((PendingServiceStatus)pendingService.Status)}, cannot reject" });
|
|
||||||
}
|
|
||||||
|
|
||||||
pendingService.Status = (int)PendingServiceStatus.Rejected;
|
|
||||||
pendingService.AssignedBy = "admin";
|
|
||||||
pendingService.AssignedAt = DateTime.UtcNow;
|
|
||||||
pendingService.Version++;
|
|
||||||
|
|
||||||
await db.SaveChangesAsync();
|
|
||||||
|
|
||||||
_logger.LogInformation("Service {ServiceName} rejected by admin", pendingService.K8sServiceName);
|
|
||||||
|
|
||||||
return Ok(new { success = true, message = $"Service '{pendingService.K8sServiceName}' rejected" });
|
|
||||||
}
|
|
||||||
|
|
||||||
[HttpGet("clusters")]
|
|
||||||
public async Task<IActionResult> GetClusters()
|
|
||||||
{
|
|
||||||
await using var db = _dbContextFactory.CreateDbContext();
|
|
||||||
|
|
||||||
var clusters = await db.ServiceInstances
|
|
||||||
.Where(i => !i.IsDeleted)
|
|
||||||
.GroupBy(i => i.ClusterId)
|
|
||||||
.Select(g => new
|
|
||||||
{
|
|
||||||
ClusterId = g.Key,
|
|
||||||
InstanceCount = g.Count(),
|
|
||||||
HealthyCount = g.Count(i => i.Health == 1)
|
|
||||||
})
|
|
||||||
.ToListAsync();
|
|
||||||
|
|
||||||
return Ok(clusters);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public class AssignServiceRequest
|
|
||||||
{
|
|
||||||
public string ClusterId { get; set; } = string.Empty;
|
|
||||||
}
|
|
||||||
@ -16,7 +16,6 @@ public class GatewayDbContext : PlatformDbContext
|
|||||||
public DbSet<GwTenant> Tenants => Set<GwTenant>();
|
public DbSet<GwTenant> Tenants => Set<GwTenant>();
|
||||||
public DbSet<GwTenantRoute> TenantRoutes => Set<GwTenantRoute>();
|
public DbSet<GwTenantRoute> TenantRoutes => Set<GwTenantRoute>();
|
||||||
public DbSet<GwServiceInstance> ServiceInstances => Set<GwServiceInstance>();
|
public DbSet<GwServiceInstance> ServiceInstances => Set<GwServiceInstance>();
|
||||||
public DbSet<GwPendingServiceDiscovery> PendingServiceDiscoveries => Set<GwPendingServiceDiscovery>();
|
|
||||||
|
|
||||||
public override int SaveChanges(bool acceptAllChangesOnSuccess)
|
public override int SaveChanges(bool acceptAllChangesOnSuccess)
|
||||||
{
|
{
|
||||||
@ -122,21 +121,6 @@ public class GatewayDbContext : PlatformDbContext
|
|||||||
entity.HasIndex(e => e.Health);
|
entity.HasIndex(e => e.Health);
|
||||||
});
|
});
|
||||||
|
|
||||||
modelBuilder.Entity<GwPendingServiceDiscovery>(entity =>
|
|
||||||
{
|
|
||||||
entity.HasKey(e => e.Id);
|
|
||||||
entity.Property(e => e.K8sServiceName).HasMaxLength(255).IsRequired();
|
|
||||||
entity.Property(e => e.K8sNamespace).HasMaxLength(255).IsRequired();
|
|
||||||
entity.Property(e => e.K8sClusterIP).HasMaxLength(50);
|
|
||||||
entity.Property(e => e.DiscoveredPorts).HasMaxLength(500);
|
|
||||||
entity.Property(e => e.Labels).HasMaxLength(2000);
|
|
||||||
entity.Property(e => e.AssignedClusterId).HasMaxLength(100);
|
|
||||||
entity.Property(e => e.AssignedBy).HasMaxLength(100);
|
|
||||||
entity.HasIndex(e => new { e.K8sServiceName, e.K8sNamespace, e.IsDeleted }).IsUnique();
|
|
||||||
entity.HasIndex(e => e.Status);
|
|
||||||
entity.HasIndex(e => e.DiscoveredAt);
|
|
||||||
});
|
|
||||||
|
|
||||||
base.OnModelCreating(modelBuilder);
|
base.OnModelCreating(modelBuilder);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,27 +0,0 @@
|
|||||||
namespace YarpGateway.Models;
|
|
||||||
|
|
||||||
public class GwPendingServiceDiscovery
|
|
||||||
{
|
|
||||||
public long Id { get; set; }
|
|
||||||
public string K8sServiceName { get; set; } = string.Empty;
|
|
||||||
public string K8sNamespace { get; set; } = string.Empty;
|
|
||||||
public string? K8sClusterIP { get; set; }
|
|
||||||
public string DiscoveredPorts { get; set; } = "[]";
|
|
||||||
public string Labels { get; set; } = "{}";
|
|
||||||
public int PodCount { get; set; } = 0;
|
|
||||||
public int Status { get; set; } = 0;
|
|
||||||
public string? AssignedClusterId { get; set; }
|
|
||||||
public string? AssignedBy { get; set; }
|
|
||||||
public DateTime? AssignedAt { get; set; }
|
|
||||||
public DateTime DiscoveredAt { get; set; } = DateTime.UtcNow;
|
|
||||||
public bool IsDeleted { get; set; } = false;
|
|
||||||
public int Version { get; set; } = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public enum PendingServiceStatus
|
|
||||||
{
|
|
||||||
Pending = 0,
|
|
||||||
Approved = 1,
|
|
||||||
Rejected = 2,
|
|
||||||
K8sServiceNotFound = 3
|
|
||||||
}
|
|
||||||
@ -11,8 +11,6 @@ using YarpGateway.LoadBalancing;
|
|||||||
using YarpGateway.Middleware;
|
using YarpGateway.Middleware;
|
||||||
using YarpGateway.Services;
|
using YarpGateway.Services;
|
||||||
using StackExchange.Redis;
|
using StackExchange.Redis;
|
||||||
using Fengling.ServiceDiscovery.Extensions;
|
|
||||||
using Fengling.ServiceDiscovery.Kubernetes.Extensions;
|
|
||||||
|
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
var builder = WebApplication.CreateBuilder(args);
|
||||||
|
|
||||||
@ -105,19 +103,7 @@ builder.Services.AddSingleton<IProxyConfigProvider>(sp => sp.GetRequiredService<
|
|||||||
|
|
||||||
builder.Services.AddHostedService<PgSqlConfigChangeListener>();
|
builder.Services.AddHostedService<PgSqlConfigChangeListener>();
|
||||||
|
|
||||||
// 添加 Kubernetes 服务发现
|
// CORS 配置
|
||||||
var useInClusterConfig = builder.Configuration.GetValue<bool>("ServiceDiscovery:UseInClusterConfig", true);
|
|
||||||
builder.Services.AddKubernetesServiceDiscovery(options =>
|
|
||||||
{
|
|
||||||
options.LabelSelector = "app.kubernetes.io/managed-by=yarp";
|
|
||||||
options.UseInClusterConfig = useInClusterConfig;
|
|
||||||
});
|
|
||||||
|
|
||||||
builder.Services.AddServiceDiscovery();
|
|
||||||
|
|
||||||
builder.Services.AddHostedService<KubernetesPendingSyncService>();
|
|
||||||
|
|
||||||
// CORS 配置 - 修复 AllowAnyOrigin 与 AllowCredentials 不兼容问题
|
|
||||||
var corsSettings = builder.Configuration.GetSection("Cors");
|
var corsSettings = builder.Configuration.GetSection("Cors");
|
||||||
builder.Services.AddCors(options =>
|
builder.Services.AddCors(options =>
|
||||||
{
|
{
|
||||||
@ -181,4 +167,4 @@ catch (Exception ex)
|
|||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
Log.CloseAndFlush();
|
Log.CloseAndFlush();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,161 +0,0 @@
|
|||||||
using System.Text.Json;
|
|
||||||
using Microsoft.EntityFrameworkCore;
|
|
||||||
using YarpGateway.Data;
|
|
||||||
using YarpGateway.Models;
|
|
||||||
|
|
||||||
namespace YarpGateway.Services;
|
|
||||||
|
|
||||||
public class KubernetesPendingSyncService : BackgroundService
|
|
||||||
{
|
|
||||||
private readonly IServiceProvider _serviceProvider;
|
|
||||||
private readonly ILogger<KubernetesPendingSyncService> _logger;
|
|
||||||
private readonly TimeSpan _syncInterval = TimeSpan.FromSeconds(30);
|
|
||||||
private readonly TimeSpan _staleThreshold = TimeSpan.FromHours(24);
|
|
||||||
|
|
||||||
public KubernetesPendingSyncService(
|
|
||||||
IServiceProvider serviceProvider,
|
|
||||||
ILogger<KubernetesPendingSyncService> logger)
|
|
||||||
{
|
|
||||||
_serviceProvider = serviceProvider;
|
|
||||||
_logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("Starting K8s pending service sync background task");
|
|
||||||
|
|
||||||
while (!stoppingToken.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await SyncPendingServicesAsync(stoppingToken);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Error during K8s pending service sync");
|
|
||||||
}
|
|
||||||
|
|
||||||
await Task.Delay(_syncInterval, stoppingToken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task SyncPendingServicesAsync(CancellationToken ct)
|
|
||||||
{
|
|
||||||
using var scope = _serviceProvider.CreateScope();
|
|
||||||
var providers = scope.ServiceProvider.GetServices<Fengling.ServiceDiscovery.IServiceDiscoveryProvider>();
|
|
||||||
var k8sProvider = providers.FirstOrDefault(p => p.ProviderName == "Kubernetes");
|
|
||||||
|
|
||||||
if (k8sProvider == null)
|
|
||||||
{
|
|
||||||
_logger.LogWarning("No Kubernetes service discovery provider found");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
var dbContextFactory = scope.ServiceProvider.GetRequiredService<IDbContextFactory<GatewayDbContext>>();
|
|
||||||
|
|
||||||
var discoveredServices = await k8sProvider.GetServicesAsync(ct);
|
|
||||||
|
|
||||||
await using var db = await dbContextFactory.CreateDbContextAsync(ct);
|
|
||||||
|
|
||||||
var existingPending = await db.PendingServiceDiscoveries
|
|
||||||
.Where(p => !p.IsDeleted && p.Status == (int)PendingServiceStatus.Pending)
|
|
||||||
.ToListAsync(ct);
|
|
||||||
|
|
||||||
var existingDict = existingPending
|
|
||||||
.ToDictionary(p => $"{p.K8sServiceName}|{p.K8sNamespace}");
|
|
||||||
|
|
||||||
var discoveredSet = discoveredServices
|
|
||||||
.Select(s => $"{s.Name}|{s.Namespace}")
|
|
||||||
.ToHashSet();
|
|
||||||
|
|
||||||
var addedCount = 0;
|
|
||||||
var updatedCount = 0;
|
|
||||||
var cleanedCount = 0;
|
|
||||||
|
|
||||||
foreach (var item in existingDict)
|
|
||||||
{
|
|
||||||
var key = item.Key;
|
|
||||||
|
|
||||||
if (!discoveredSet.Contains(key))
|
|
||||||
{
|
|
||||||
var pending = item.Value;
|
|
||||||
|
|
||||||
if (DateTime.UtcNow - pending.DiscoveredAt > _staleThreshold)
|
|
||||||
{
|
|
||||||
pending.IsDeleted = true;
|
|
||||||
pending.Version++;
|
|
||||||
cleanedCount++;
|
|
||||||
_logger.LogInformation("Cleaned up stale pending service {ServiceName} in namespace {Namespace}",
|
|
||||||
pending.K8sServiceName, pending.K8sNamespace);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
pending.Status = (int)PendingServiceStatus.K8sServiceNotFound;
|
|
||||||
pending.Version++;
|
|
||||||
_logger.LogInformation("Pending service {ServiceName} in namespace {Namespace} not found in K8s, marked as not found",
|
|
||||||
pending.K8sServiceName, pending.K8sNamespace);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (discoveredServices.Count > 0)
|
|
||||||
{
|
|
||||||
var discoveredDict = discoveredServices.ToDictionary(
|
|
||||||
s => $"{s.Name}|{s.Namespace}",
|
|
||||||
s => s);
|
|
||||||
|
|
||||||
foreach (var item in discoveredDict)
|
|
||||||
{
|
|
||||||
var key = item.Key;
|
|
||||||
var service = item.Value;
|
|
||||||
|
|
||||||
if (existingDict.TryGetValue(key, out var existing))
|
|
||||||
{
|
|
||||||
if (existing.Status == (int)PendingServiceStatus.K8sServiceNotFound)
|
|
||||||
{
|
|
||||||
existing.Status = (int)PendingServiceStatus.Pending;
|
|
||||||
existing.Version++;
|
|
||||||
updatedCount++;
|
|
||||||
}
|
|
||||||
|
|
||||||
var portsJson = JsonSerializer.Serialize(service.Ports);
|
|
||||||
var labelsJson = JsonSerializer.Serialize(service.Labels);
|
|
||||||
|
|
||||||
if (existing.DiscoveredPorts != portsJson || existing.Labels != labelsJson)
|
|
||||||
{
|
|
||||||
existing.DiscoveredPorts = portsJson;
|
|
||||||
existing.Labels = labelsJson;
|
|
||||||
existing.K8sClusterIP = service.ClusterIP;
|
|
||||||
existing.PodCount = service.Ports.Count;
|
|
||||||
existing.Version++;
|
|
||||||
updatedCount++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
var newPending = new GwPendingServiceDiscovery
|
|
||||||
{
|
|
||||||
K8sServiceName = service.Name,
|
|
||||||
K8sNamespace = service.Namespace,
|
|
||||||
K8sClusterIP = service.ClusterIP,
|
|
||||||
DiscoveredPorts = JsonSerializer.Serialize(service.Ports),
|
|
||||||
Labels = JsonSerializer.Serialize(service.Labels),
|
|
||||||
PodCount = service.Ports.Count,
|
|
||||||
Status = (int)PendingServiceStatus.Pending,
|
|
||||||
DiscoveredAt = DateTime.UtcNow,
|
|
||||||
Version = 1
|
|
||||||
};
|
|
||||||
db.PendingServiceDiscoveries.Add(newPending);
|
|
||||||
addedCount++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (addedCount > 0 || updatedCount > 0 || cleanedCount > 0)
|
|
||||||
{
|
|
||||||
await db.SaveChangesAsync(ct);
|
|
||||||
_logger.LogInformation("K8s sync completed: {Added} new, {Updated} updated, {Cleaned} cleaned",
|
|
||||||
addedCount, updatedCount, cleanedCount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue
Block a user