feat[gateway]: add K8s service discovery with pending approval workflow

- Add PendingServiceDiscovery model and database migration
- Add PendingServices API controller for service assignment
- Add KubernetesPendingSyncService for background sync
- Add RBAC configuration for K8s service discovery
- Update Dockerfile and K8s deployment configs
- Add service discovery design documentation

Workflow: K8s services with label managed-by=yarp are discovered
and stored in pending table. Admin approves before they become
active gateway downstream services.
This commit is contained in:
movingsam 2026-02-22 22:14:54 +08:00
parent a39824397c
commit abe3456ccb
10 changed files with 955 additions and 3 deletions

View File

@ -0,0 +1,209 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using YarpGateway.Data;
using YarpGateway.Models;
namespace YarpGateway.Controllers;
[ApiController]
[Route("api/gateway/pending-services")]
public class PendingServicesController : ControllerBase
{
private readonly IDbContextFactory<GatewayDbContext> _dbContextFactory;
private readonly ILogger<PendingServicesController> _logger;
public PendingServicesController(
IDbContextFactory<GatewayDbContext> dbContextFactory,
ILogger<PendingServicesController> logger)
{
_dbContextFactory = dbContextFactory;
_logger = logger;
}
[HttpGet]
public async Task<IActionResult> GetPendingServices(
[FromQuery] int page = 1,
[FromQuery] int pageSize = 10,
[FromQuery] int? status = null)
{
await using var db = _dbContextFactory.CreateDbContext();
var query = db.PendingServiceDiscoveries.Where(p => !p.IsDeleted);
if (status.HasValue)
{
query = query.Where(p => p.Status == status.Value);
}
var total = await query.CountAsync();
var items = await query
.OrderByDescending(p => p.DiscoveredAt)
.Skip((page - 1) * pageSize)
.Take(pageSize)
.Select(p => new
{
p.Id,
p.K8sServiceName,
p.K8sNamespace,
p.K8sClusterIP,
DiscoveredPorts = System.Text.Json.JsonSerializer.Deserialize<List<int>>(p.DiscoveredPorts) ?? new List<int>(),
Labels = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(p.Labels) ?? new Dictionary<string, string>(),
p.PodCount,
Status = (PendingServiceStatus)p.Status,
p.AssignedClusterId,
p.AssignedBy,
p.AssignedAt,
p.DiscoveredAt
})
.ToListAsync();
return Ok(new { items, total, page, pageSize });
}
[HttpGet("{id}")]
public async Task<IActionResult> GetPendingService(long id)
{
await using var db = _dbContextFactory.CreateDbContext();
var service = await db.PendingServiceDiscoveries.FindAsync(id);
if (service == null || service.IsDeleted)
{
return NotFound(new { message = "Pending service not found" });
}
return Ok(new
{
service.Id,
service.K8sServiceName,
service.K8sNamespace,
service.K8sClusterIP,
DiscoveredPorts = System.Text.Json.JsonSerializer.Deserialize<List<int>>(service.DiscoveredPorts) ?? new List<int>(),
Labels = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(service.Labels) ?? new Dictionary<string, string>(),
service.PodCount,
Status = (PendingServiceStatus)service.Status,
service.AssignedClusterId,
service.AssignedBy,
service.AssignedAt,
service.DiscoveredAt
});
}
[HttpPost("{id}/assign")]
public async Task<IActionResult> AssignService(long id, [FromBody] AssignServiceRequest request)
{
await using var db = _dbContextFactory.CreateDbContext();
var pendingService = await db.PendingServiceDiscoveries.FindAsync(id);
if (pendingService == null || pendingService.IsDeleted)
{
return NotFound(new { message = "Pending service not found" });
}
if (pendingService.Status != (int)PendingServiceStatus.Pending)
{
return BadRequest(new { message = $"Service is already {((PendingServiceStatus)pendingService.Status)}, cannot assign" });
}
if (string.IsNullOrEmpty(request.ClusterId))
{
return BadRequest(new { message = "ClusterId is required" });
}
var existingCluster = await db.ServiceInstances
.AnyAsync(i => i.ClusterId == request.ClusterId && !i.IsDeleted);
if (!existingCluster)
{
return BadRequest(new { message = $"Cluster '{request.ClusterId}' does not exist. Please create the cluster first." });
}
var discoveredPorts = System.Text.Json.JsonSerializer.Deserialize<List<int>>(pendingService.DiscoveredPorts) ?? new List<int>();
var primaryPort = discoveredPorts.FirstOrDefault() > 0 ? discoveredPorts.First() : 80;
var instanceNumber = await db.ServiceInstances
.CountAsync(i => i.ClusterId == request.ClusterId && !i.IsDeleted);
var newInstance = new GwServiceInstance
{
ClusterId = request.ClusterId,
DestinationId = $"{pendingService.K8sServiceName}-{instanceNumber + 1}",
Address = $"http://{pendingService.K8sClusterIP}:{primaryPort}",
Health = 1,
Weight = 100,
Status = 1,
CreatedTime = DateTime.UtcNow,
Version = 1
};
db.ServiceInstances.Add(newInstance);
pendingService.Status = (int)PendingServiceStatus.Approved;
pendingService.AssignedClusterId = request.ClusterId;
pendingService.AssignedBy = "admin";
pendingService.AssignedAt = DateTime.UtcNow;
pendingService.Version++;
await db.SaveChangesAsync();
_logger.LogInformation("Service {ServiceName} assigned to cluster {ClusterId} by admin",
pendingService.K8sServiceName, request.ClusterId);
return Ok(new
{
success = true,
message = $"Service '{pendingService.K8sServiceName}' assigned to cluster '{request.ClusterId}'",
instanceId = newInstance.Id
});
}
[HttpPost("{id}/reject")]
public async Task<IActionResult> RejectService(long id)
{
await using var db = _dbContextFactory.CreateDbContext();
var pendingService = await db.PendingServiceDiscoveries.FindAsync(id);
if (pendingService == null || pendingService.IsDeleted)
{
return NotFound(new { message = "Pending service not found" });
}
if (pendingService.Status != (int)PendingServiceStatus.Pending)
{
return BadRequest(new { message = $"Service is already {((PendingServiceStatus)pendingService.Status)}, cannot reject" });
}
pendingService.Status = (int)PendingServiceStatus.Rejected;
pendingService.AssignedBy = "admin";
pendingService.AssignedAt = DateTime.UtcNow;
pendingService.Version++;
await db.SaveChangesAsync();
_logger.LogInformation("Service {ServiceName} rejected by admin", pendingService.K8sServiceName);
return Ok(new { success = true, message = $"Service '{pendingService.K8sServiceName}' rejected" });
}
[HttpGet("clusters")]
public async Task<IActionResult> GetClusters()
{
await using var db = _dbContextFactory.CreateDbContext();
var clusters = await db.ServiceInstances
.Where(i => !i.IsDeleted)
.GroupBy(i => i.ClusterId)
.Select(g => new
{
ClusterId = g.Key,
InstanceCount = g.Count(),
HealthyCount = g.Count(i => i.Health == 1)
})
.ToListAsync();
return Ok(clusters);
}
}
public class AssignServiceRequest
{
public string ClusterId { get; set; } = string.Empty;
}

View File

@ -15,6 +15,7 @@ public class GatewayDbContext : DbContext
public DbSet<GwTenant> Tenants => Set<GwTenant>(); public DbSet<GwTenant> Tenants => Set<GwTenant>();
public DbSet<GwTenantRoute> TenantRoutes => Set<GwTenantRoute>(); public DbSet<GwTenantRoute> TenantRoutes => Set<GwTenantRoute>();
public DbSet<GwServiceInstance> ServiceInstances => Set<GwServiceInstance>(); public DbSet<GwServiceInstance> ServiceInstances => Set<GwServiceInstance>();
public DbSet<GwPendingServiceDiscovery> PendingServiceDiscoveries => Set<GwPendingServiceDiscovery>();
public override int SaveChanges(bool acceptAllChangesOnSuccess) public override int SaveChanges(bool acceptAllChangesOnSuccess)
{ {
@ -120,6 +121,21 @@ public class GatewayDbContext : DbContext
entity.HasIndex(e => e.Health); entity.HasIndex(e => e.Health);
}); });
modelBuilder.Entity<GwPendingServiceDiscovery>(entity =>
{
entity.HasKey(e => e.Id);
entity.Property(e => e.K8sServiceName).HasMaxLength(255).IsRequired();
entity.Property(e => e.K8sNamespace).HasMaxLength(255).IsRequired();
entity.Property(e => e.K8sClusterIP).HasMaxLength(50);
entity.Property(e => e.DiscoveredPorts).HasMaxLength(500);
entity.Property(e => e.Labels).HasMaxLength(2000);
entity.Property(e => e.AssignedClusterId).HasMaxLength(100);
entity.Property(e => e.AssignedBy).HasMaxLength(100);
entity.HasIndex(e => new { e.K8sServiceName, e.K8sNamespace, e.IsDeleted }).IsUnique();
entity.HasIndex(e => e.Status);
entity.HasIndex(e => e.DiscoveredAt);
});
base.OnModelCreating(modelBuilder); base.OnModelCreating(modelBuilder);
} }
} }

View File

@ -1,4 +1,4 @@
FROM mcr.microsoft.com/dotnet/aspnet:10.0 AS base FROM mcr.microsoft.com/dotnet/aspnet:10.0 AS base
USER $APP_UID USER $APP_UID
WORKDIR /app WORKDIR /app
EXPOSE 8080 EXPOSE 8080
@ -7,6 +7,9 @@ EXPOSE 8081
FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build
ARG BUILD_CONFIGURATION=Release ARG BUILD_CONFIGURATION=Release
WORKDIR /src WORKDIR /src
# Copy Directory.Packages.props for centralized version management
COPY ["Directory.Packages.props", "./"]
COPY ["src/Directory.Packages.props", "src/"]
COPY ["src/YarpGateway/YarpGateway.csproj", "src/YarpGateway/"] COPY ["src/YarpGateway/YarpGateway.csproj", "src/YarpGateway/"]
COPY ["src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Core/Fengling.ServiceDiscovery.Core.csproj", "src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Core/"] COPY ["src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Core/Fengling.ServiceDiscovery.Core.csproj", "src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Core/"]
COPY ["src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Kubernetes/Fengling.ServiceDiscovery.Kubernetes.csproj", "src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Kubernetes/"] COPY ["src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Kubernetes/Fengling.ServiceDiscovery.Kubernetes.csproj", "src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Kubernetes/"]

View File

@ -0,0 +1,275 @@
// <auto-generated />
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
using YarpGateway.Data;
#nullable disable
namespace YarpGateway.Migrations
{
[DbContext(typeof(GatewayDbContext))]
[Migration("20260222134342_AddPendingServiceDiscovery")]
partial class AddPendingServiceDiscovery
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "10.0.2")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("YarpGateway.Models.GwPendingServiceDiscovery", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<DateTime?>("AssignedAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("AssignedBy")
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<string>("AssignedClusterId")
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<DateTime>("DiscoveredAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("DiscoveredPorts")
.IsRequired()
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<string>("K8sClusterIP")
.HasMaxLength(50)
.HasColumnType("character varying(50)");
b.Property<string>("K8sNamespace")
.IsRequired()
.HasMaxLength(255)
.HasColumnType("character varying(255)");
b.Property<string>("K8sServiceName")
.IsRequired()
.HasMaxLength(255)
.HasColumnType("character varying(255)");
b.Property<string>("Labels")
.IsRequired()
.HasMaxLength(2000)
.HasColumnType("character varying(2000)");
b.Property<int>("PodCount")
.HasColumnType("integer");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<int>("Version")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("DiscoveredAt");
b.HasIndex("Status");
b.HasIndex("K8sServiceName", "K8sNamespace", "IsDeleted")
.IsUnique();
b.ToTable("PendingServiceDiscoveries");
});
modelBuilder.Entity("YarpGateway.Models.GwServiceInstance", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<string>("Address")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<string>("ClusterId")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<DateTime>("CreatedTime")
.HasColumnType("timestamp with time zone");
b.Property<string>("DestinationId")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<int>("Health")
.HasColumnType("integer");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<long?>("UpdatedBy")
.HasColumnType("bigint");
b.Property<DateTime?>("UpdatedTime")
.HasColumnType("timestamp with time zone");
b.Property<int>("Version")
.HasColumnType("integer");
b.Property<int>("Weight")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("Health");
b.HasIndex("ClusterId", "DestinationId")
.IsUnique();
b.ToTable("ServiceInstances");
});
modelBuilder.Entity("YarpGateway.Models.GwTenant", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<DateTime>("CreatedTime")
.HasColumnType("timestamp with time zone");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<string>("TenantCode")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("character varying(50)");
b.Property<string>("TenantName")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<long?>("UpdatedBy")
.HasColumnType("bigint");
b.Property<DateTime?>("UpdatedTime")
.HasColumnType("timestamp with time zone");
b.Property<int>("Version")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("TenantCode")
.IsUnique();
b.ToTable("Tenants");
});
modelBuilder.Entity("YarpGateway.Models.GwTenantRoute", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<string>("ClusterId")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<DateTime>("CreatedTime")
.HasColumnType("timestamp with time zone");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<bool>("IsGlobal")
.HasColumnType("boolean");
b.Property<string>("PathPattern")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<int>("Priority")
.HasColumnType("integer");
b.Property<string>("ServiceName")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<string>("TenantCode")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("character varying(50)");
b.Property<long?>("UpdatedBy")
.HasColumnType("bigint");
b.Property<DateTime?>("UpdatedTime")
.HasColumnType("timestamp with time zone");
b.Property<int>("Version")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("ClusterId");
b.HasIndex("ServiceName");
b.HasIndex("TenantCode");
b.HasIndex("ServiceName", "IsGlobal", "Status");
b.ToTable("TenantRoutes");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,64 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
#nullable disable
namespace YarpGateway.Migrations
{
/// <inheritdoc />
public partial class AddPendingServiceDiscovery : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "PendingServiceDiscoveries",
columns: table => new
{
Id = table.Column<long>(type: "bigint", nullable: false)
.Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
K8sServiceName = table.Column<string>(type: "character varying(255)", maxLength: 255, nullable: false),
K8sNamespace = table.Column<string>(type: "character varying(255)", maxLength: 255, nullable: false),
K8sClusterIP = table.Column<string>(type: "character varying(50)", maxLength: 50, nullable: true),
DiscoveredPorts = table.Column<string>(type: "character varying(500)", maxLength: 500, nullable: false),
Labels = table.Column<string>(type: "character varying(2000)", maxLength: 2000, nullable: false),
PodCount = table.Column<int>(type: "integer", nullable: false),
Status = table.Column<int>(type: "integer", nullable: false),
AssignedClusterId = table.Column<string>(type: "character varying(100)", maxLength: 100, nullable: true),
AssignedBy = table.Column<string>(type: "character varying(100)", maxLength: 100, nullable: true),
AssignedAt = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
DiscoveredAt = table.Column<DateTime>(type: "timestamp with time zone", nullable: false),
IsDeleted = table.Column<bool>(type: "boolean", nullable: false),
Version = table.Column<int>(type: "integer", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_PendingServiceDiscoveries", x => x.Id);
});
migrationBuilder.CreateIndex(
name: "IX_PendingServiceDiscoveries_DiscoveredAt",
table: "PendingServiceDiscoveries",
column: "DiscoveredAt");
migrationBuilder.CreateIndex(
name: "IX_PendingServiceDiscoveries_K8sServiceName_K8sNamespace_IsDel~",
table: "PendingServiceDiscoveries",
columns: new[] { "K8sServiceName", "K8sNamespace", "IsDeleted" },
unique: true);
migrationBuilder.CreateIndex(
name: "IX_PendingServiceDiscoveries_Status",
table: "PendingServiceDiscoveries",
column: "Status");
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "PendingServiceDiscoveries");
}
}
}

View File

@ -17,11 +17,81 @@ namespace YarpGateway.Migrations
{ {
#pragma warning disable 612, 618 #pragma warning disable 612, 618
modelBuilder modelBuilder
.HasAnnotation("ProductVersion", "9.0.0") .HasAnnotation("ProductVersion", "10.0.2")
.HasAnnotation("Relational:MaxIdentifierLength", 63); .HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("YarpGateway.Models.GwPendingServiceDiscovery", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<DateTime?>("AssignedAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("AssignedBy")
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<string>("AssignedClusterId")
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<DateTime>("DiscoveredAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("DiscoveredPorts")
.IsRequired()
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<string>("K8sClusterIP")
.HasMaxLength(50)
.HasColumnType("character varying(50)");
b.Property<string>("K8sNamespace")
.IsRequired()
.HasMaxLength(255)
.HasColumnType("character varying(255)");
b.Property<string>("K8sServiceName")
.IsRequired()
.HasMaxLength(255)
.HasColumnType("character varying(255)");
b.Property<string>("Labels")
.IsRequired()
.HasMaxLength(2000)
.HasColumnType("character varying(2000)");
b.Property<int>("PodCount")
.HasColumnType("integer");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<int>("Version")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("DiscoveredAt");
b.HasIndex("Status");
b.HasIndex("K8sServiceName", "K8sNamespace", "IsDeleted")
.IsUnique();
b.ToTable("PendingServiceDiscoveries");
});
modelBuilder.Entity("YarpGateway.Models.GwServiceInstance", b => modelBuilder.Entity("YarpGateway.Models.GwServiceInstance", b =>
{ {
b.Property<long>("Id") b.Property<long>("Id")

View File

@ -0,0 +1,122 @@
CREATE TABLE IF NOT EXISTS "__EFMigrationsHistory" (
"MigrationId" character varying(150) NOT NULL,
"ProductVersion" character varying(32) NOT NULL,
CONSTRAINT "PK___EFMigrationsHistory" PRIMARY KEY ("MigrationId")
);
START TRANSACTION;
CREATE TABLE "ServiceInstances" (
"Id" bigint GENERATED BY DEFAULT AS IDENTITY,
"ClusterId" character varying(100) NOT NULL,
"DestinationId" character varying(100) NOT NULL,
"Address" character varying(200) NOT NULL,
"Health" integer NOT NULL,
"Weight" integer NOT NULL,
"Status" integer NOT NULL,
"CreatedBy" bigint,
"CreatedTime" timestamp with time zone NOT NULL,
"UpdatedBy" bigint,
"UpdatedTime" timestamp with time zone,
"IsDeleted" boolean NOT NULL,
"Version" integer NOT NULL,
CONSTRAINT "PK_ServiceInstances" PRIMARY KEY ("Id")
);
CREATE TABLE "Tenants" (
"Id" bigint GENERATED BY DEFAULT AS IDENTITY,
"TenantCode" character varying(50) NOT NULL,
"TenantName" character varying(100) NOT NULL,
"Status" integer NOT NULL,
"CreatedBy" bigint,
"CreatedTime" timestamp with time zone NOT NULL,
"UpdatedBy" bigint,
"UpdatedTime" timestamp with time zone,
"IsDeleted" boolean NOT NULL,
"Version" integer NOT NULL,
CONSTRAINT "PK_Tenants" PRIMARY KEY ("Id"),
CONSTRAINT "AK_Tenants_TenantCode" UNIQUE ("TenantCode")
);
CREATE TABLE "TenantRoutes" (
"Id" bigint GENERATED BY DEFAULT AS IDENTITY,
"TenantCode" character varying(50) NOT NULL,
"ServiceName" character varying(100) NOT NULL,
"ClusterId" character varying(100) NOT NULL,
"PathPattern" character varying(200) NOT NULL,
"Priority" integer NOT NULL,
"Status" integer NOT NULL,
"CreatedBy" bigint,
"CreatedTime" timestamp with time zone NOT NULL,
"UpdatedBy" bigint,
"UpdatedTime" timestamp with time zone,
"IsDeleted" boolean NOT NULL,
"Version" integer NOT NULL,
CONSTRAINT "PK_TenantRoutes" PRIMARY KEY ("Id"),
CONSTRAINT "FK_TenantRoutes_Tenants_TenantCode" FOREIGN KEY ("TenantCode") REFERENCES "Tenants" ("TenantCode") ON DELETE RESTRICT
);
CREATE UNIQUE INDEX "IX_ServiceInstances_ClusterId_DestinationId" ON "ServiceInstances" ("ClusterId", "DestinationId");
CREATE INDEX "IX_ServiceInstances_Health" ON "ServiceInstances" ("Health");
CREATE INDEX "IX_TenantRoutes_ClusterId" ON "TenantRoutes" ("ClusterId");
CREATE UNIQUE INDEX "IX_TenantRoutes_TenantCode_ServiceName" ON "TenantRoutes" ("TenantCode", "ServiceName");
CREATE UNIQUE INDEX "IX_Tenants_TenantCode" ON "Tenants" ("TenantCode");
INSERT INTO "__EFMigrationsHistory" ("MigrationId", "ProductVersion")
VALUES ('20260201120312_InitialCreate', '10.0.2');
COMMIT;
START TRANSACTION;
ALTER TABLE "TenantRoutes" DROP CONSTRAINT "FK_TenantRoutes_Tenants_TenantCode";
ALTER TABLE "Tenants" DROP CONSTRAINT "AK_Tenants_TenantCode";
DROP INDEX "IX_TenantRoutes_TenantCode_ServiceName";
ALTER TABLE "TenantRoutes" ADD "IsGlobal" boolean NOT NULL DEFAULT FALSE;
CREATE INDEX "IX_TenantRoutes_ServiceName" ON "TenantRoutes" ("ServiceName");
CREATE INDEX "IX_TenantRoutes_ServiceName_IsGlobal_Status" ON "TenantRoutes" ("ServiceName", "IsGlobal", "Status");
CREATE INDEX "IX_TenantRoutes_TenantCode" ON "TenantRoutes" ("TenantCode");
INSERT INTO "__EFMigrationsHistory" ("MigrationId", "ProductVersion")
VALUES ('20260201133826_AddIsGlobalToTenantRoute', '10.0.2');
COMMIT;
START TRANSACTION;
CREATE TABLE "PendingServiceDiscoveries" (
"Id" bigint GENERATED BY DEFAULT AS IDENTITY,
"K8sServiceName" character varying(255) NOT NULL,
"K8sNamespace" character varying(255) NOT NULL,
"K8sClusterIP" character varying(50),
"DiscoveredPorts" character varying(500) NOT NULL,
"Labels" character varying(2000) NOT NULL,
"PodCount" integer NOT NULL,
"Status" integer NOT NULL,
"AssignedClusterId" character varying(100),
"AssignedBy" character varying(100),
"AssignedAt" timestamp with time zone,
"DiscoveredAt" timestamp with time zone NOT NULL,
"IsDeleted" boolean NOT NULL,
"Version" integer NOT NULL,
CONSTRAINT "PK_PendingServiceDiscoveries" PRIMARY KEY ("Id")
);
CREATE INDEX "IX_PendingServiceDiscoveries_DiscoveredAt" ON "PendingServiceDiscoveries" ("DiscoveredAt");
CREATE UNIQUE INDEX "IX_PendingServiceDiscoveries_K8sServiceName_K8sNamespace_IsDel~" ON "PendingServiceDiscoveries" ("K8sServiceName", "K8sNamespace", "IsDeleted");
CREATE INDEX "IX_PendingServiceDiscoveries_Status" ON "PendingServiceDiscoveries" ("Status");
INSERT INTO "__EFMigrationsHistory" ("MigrationId", "ProductVersion")
VALUES ('20260222134342_AddPendingServiceDiscovery', '10.0.2');
COMMIT;

View File

@ -0,0 +1,27 @@
namespace YarpGateway.Models;
public class GwPendingServiceDiscovery
{
public long Id { get; set; }
public string K8sServiceName { get; set; } = string.Empty;
public string K8sNamespace { get; set; } = string.Empty;
public string? K8sClusterIP { get; set; }
public string DiscoveredPorts { get; set; } = "[]";
public string Labels { get; set; } = "{}";
public int PodCount { get; set; } = 0;
public int Status { get; set; } = 0;
public string? AssignedClusterId { get; set; }
public string? AssignedBy { get; set; }
public DateTime? AssignedAt { get; set; }
public DateTime DiscoveredAt { get; set; } = DateTime.UtcNow;
public bool IsDeleted { get; set; } = false;
public int Version { get; set; } = 0;
}
public enum PendingServiceStatus
{
Pending = 0,
Approved = 1,
Rejected = 2,
K8sServiceNotFound = 3
}

View File

@ -67,14 +67,17 @@ builder.Services.AddSingleton<IProxyConfigProvider>(sp => sp.GetRequiredService<
builder.Services.AddHostedService<PgSqlConfigChangeListener>(); builder.Services.AddHostedService<PgSqlConfigChangeListener>();
// 添加 Kubernetes 服务发现 // 添加 Kubernetes 服务发现
var useInClusterConfig = builder.Configuration.GetValue<bool>("ServiceDiscovery:UseInClusterConfig", true);
builder.Services.AddKubernetesServiceDiscovery(options => builder.Services.AddKubernetesServiceDiscovery(options =>
{ {
options.LabelSelector = "app.kubernetes.io/managed-by=yarp"; options.LabelSelector = "app.kubernetes.io/managed-by=yarp";
options.UseInClusterConfig = false; // 本地调试设为 false生产环境设为 true options.UseInClusterConfig = useInClusterConfig;
}); });
builder.Services.AddServiceDiscovery(); builder.Services.AddServiceDiscovery();
builder.Services.AddHostedService<KubernetesPendingSyncService>();
var corsSettings = builder.Configuration.GetSection("Cors"); var corsSettings = builder.Configuration.GetSection("Cors");
builder.Services.AddCors(options => builder.Services.AddCors(options =>
{ {
@ -109,6 +112,8 @@ app.UseCors("AllowFrontend");
app.UseMiddleware<JwtTransformMiddleware>(); app.UseMiddleware<JwtTransformMiddleware>();
app.UseMiddleware<TenantRoutingMiddleware>(); app.UseMiddleware<TenantRoutingMiddleware>();
app.MapGet("/health", () => Results.Ok(new { status = "healthy", timestamp = DateTime.UtcNow }));
app.MapControllers(); app.MapControllers();
app.MapReverseProxy(); app.MapReverseProxy();

View File

@ -0,0 +1,161 @@
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using YarpGateway.Data;
using YarpGateway.Models;
namespace YarpGateway.Services;
public class KubernetesPendingSyncService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<KubernetesPendingSyncService> _logger;
private readonly TimeSpan _syncInterval = TimeSpan.FromSeconds(30);
private readonly TimeSpan _staleThreshold = TimeSpan.FromHours(24);
public KubernetesPendingSyncService(
IServiceProvider serviceProvider,
ILogger<KubernetesPendingSyncService> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting K8s pending service sync background task");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await SyncPendingServicesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during K8s pending service sync");
}
await Task.Delay(_syncInterval, stoppingToken);
}
}
private async Task SyncPendingServicesAsync(CancellationToken ct)
{
using var scope = _serviceProvider.CreateScope();
var providers = scope.ServiceProvider.GetServices<Fengling.ServiceDiscovery.IServiceDiscoveryProvider>();
var k8sProvider = providers.FirstOrDefault(p => p.ProviderName == "Kubernetes");
if (k8sProvider == null)
{
_logger.LogWarning("No Kubernetes service discovery provider found");
return;
}
var dbContextFactory = scope.ServiceProvider.GetRequiredService<IDbContextFactory<GatewayDbContext>>();
var discoveredServices = await k8sProvider.GetServicesAsync(ct);
await using var db = await dbContextFactory.CreateDbContextAsync(ct);
var existingPending = await db.PendingServiceDiscoveries
.Where(p => !p.IsDeleted && p.Status == (int)PendingServiceStatus.Pending)
.ToListAsync(ct);
var existingDict = existingPending
.ToDictionary(p => $"{p.K8sServiceName}|{p.K8sNamespace}");
var discoveredSet = discoveredServices
.Select(s => $"{s.Name}|{s.Namespace}")
.ToHashSet();
var addedCount = 0;
var updatedCount = 0;
var cleanedCount = 0;
foreach (var item in existingDict)
{
var key = item.Key;
if (!discoveredSet.Contains(key))
{
var pending = item.Value;
if (DateTime.UtcNow - pending.DiscoveredAt > _staleThreshold)
{
pending.IsDeleted = true;
pending.Version++;
cleanedCount++;
_logger.LogInformation("Cleaned up stale pending service {ServiceName} in namespace {Namespace}",
pending.K8sServiceName, pending.K8sNamespace);
}
else
{
pending.Status = (int)PendingServiceStatus.K8sServiceNotFound;
pending.Version++;
_logger.LogInformation("Pending service {ServiceName} in namespace {Namespace} not found in K8s, marked as not found",
pending.K8sServiceName, pending.K8sNamespace);
}
}
}
if (discoveredServices.Count > 0)
{
var discoveredDict = discoveredServices.ToDictionary(
s => $"{s.Name}|{s.Namespace}",
s => s);
foreach (var item in discoveredDict)
{
var key = item.Key;
var service = item.Value;
if (existingDict.TryGetValue(key, out var existing))
{
if (existing.Status == (int)PendingServiceStatus.K8sServiceNotFound)
{
existing.Status = (int)PendingServiceStatus.Pending;
existing.Version++;
updatedCount++;
}
var portsJson = JsonSerializer.Serialize(service.Ports);
var labelsJson = JsonSerializer.Serialize(service.Labels);
if (existing.DiscoveredPorts != portsJson || existing.Labels != labelsJson)
{
existing.DiscoveredPorts = portsJson;
existing.Labels = labelsJson;
existing.K8sClusterIP = service.ClusterIP;
existing.PodCount = service.Ports.Count;
existing.Version++;
updatedCount++;
}
}
else
{
var newPending = new GwPendingServiceDiscovery
{
K8sServiceName = service.Name,
K8sNamespace = service.Namespace,
K8sClusterIP = service.ClusterIP,
DiscoveredPorts = JsonSerializer.Serialize(service.Ports),
Labels = JsonSerializer.Serialize(service.Labels),
PodCount = service.Ports.Count,
Status = (int)PendingServiceStatus.Pending,
DiscoveredAt = DateTime.UtcNow,
Version = 1
};
db.PendingServiceDiscoveries.Add(newPending);
addedCount++;
}
}
}
if (addedCount > 0 || updatedCount > 0 || cleanedCount > 0)
{
await db.SaveChangesAsync(ct);
_logger.LogInformation("K8s sync completed: {Added} new, {Updated} updated, {Cleaned} cleaned",
addedCount, updatedCount, cleanedCount);
}
}
}