diff --git a/Controllers/PendingServicesController.cs b/Controllers/PendingServicesController.cs new file mode 100644 index 0000000..4226cf8 --- /dev/null +++ b/Controllers/PendingServicesController.cs @@ -0,0 +1,209 @@ +using Microsoft.AspNetCore.Mvc; +using Microsoft.EntityFrameworkCore; +using YarpGateway.Data; +using YarpGateway.Models; + +namespace YarpGateway.Controllers; + +[ApiController] +[Route("api/gateway/pending-services")] +public class PendingServicesController : ControllerBase +{ + private readonly IDbContextFactory _dbContextFactory; + private readonly ILogger _logger; + + public PendingServicesController( + IDbContextFactory dbContextFactory, + ILogger logger) + { + _dbContextFactory = dbContextFactory; + _logger = logger; + } + + [HttpGet] + public async Task GetPendingServices( + [FromQuery] int page = 1, + [FromQuery] int pageSize = 10, + [FromQuery] int? status = null) + { + await using var db = _dbContextFactory.CreateDbContext(); + var query = db.PendingServiceDiscoveries.Where(p => !p.IsDeleted); + + if (status.HasValue) + { + query = query.Where(p => p.Status == status.Value); + } + + var total = await query.CountAsync(); + var items = await query + .OrderByDescending(p => p.DiscoveredAt) + .Skip((page - 1) * pageSize) + .Take(pageSize) + .Select(p => new + { + p.Id, + p.K8sServiceName, + p.K8sNamespace, + p.K8sClusterIP, + DiscoveredPorts = System.Text.Json.JsonSerializer.Deserialize>(p.DiscoveredPorts) ?? new List(), + Labels = System.Text.Json.JsonSerializer.Deserialize>(p.Labels) ?? new Dictionary(), + p.PodCount, + Status = (PendingServiceStatus)p.Status, + p.AssignedClusterId, + p.AssignedBy, + p.AssignedAt, + p.DiscoveredAt + }) + .ToListAsync(); + + return Ok(new { items, total, page, pageSize }); + } + + [HttpGet("{id}")] + public async Task GetPendingService(long id) + { + await using var db = _dbContextFactory.CreateDbContext(); + var service = await db.PendingServiceDiscoveries.FindAsync(id); + + if (service == null || service.IsDeleted) + { + return NotFound(new { message = "Pending service not found" }); + } + + return Ok(new + { + service.Id, + service.K8sServiceName, + service.K8sNamespace, + service.K8sClusterIP, + DiscoveredPorts = System.Text.Json.JsonSerializer.Deserialize>(service.DiscoveredPorts) ?? new List(), + Labels = System.Text.Json.JsonSerializer.Deserialize>(service.Labels) ?? new Dictionary(), + service.PodCount, + Status = (PendingServiceStatus)service.Status, + service.AssignedClusterId, + service.AssignedBy, + service.AssignedAt, + service.DiscoveredAt + }); + } + + [HttpPost("{id}/assign")] + public async Task AssignService(long id, [FromBody] AssignServiceRequest request) + { + await using var db = _dbContextFactory.CreateDbContext(); + + var pendingService = await db.PendingServiceDiscoveries.FindAsync(id); + if (pendingService == null || pendingService.IsDeleted) + { + return NotFound(new { message = "Pending service not found" }); + } + + if (pendingService.Status != (int)PendingServiceStatus.Pending) + { + return BadRequest(new { message = $"Service is already {((PendingServiceStatus)pendingService.Status)}, cannot assign" }); + } + + if (string.IsNullOrEmpty(request.ClusterId)) + { + return BadRequest(new { message = "ClusterId is required" }); + } + + var existingCluster = await db.ServiceInstances + .AnyAsync(i => i.ClusterId == request.ClusterId && !i.IsDeleted); + + if (!existingCluster) + { + return BadRequest(new { message = $"Cluster '{request.ClusterId}' does not exist. Please create the cluster first." }); + } + + var discoveredPorts = System.Text.Json.JsonSerializer.Deserialize>(pendingService.DiscoveredPorts) ?? new List(); + var primaryPort = discoveredPorts.FirstOrDefault() > 0 ? discoveredPorts.First() : 80; + + var instanceNumber = await db.ServiceInstances + .CountAsync(i => i.ClusterId == request.ClusterId && !i.IsDeleted); + + var newInstance = new GwServiceInstance + { + ClusterId = request.ClusterId, + DestinationId = $"{pendingService.K8sServiceName}-{instanceNumber + 1}", + Address = $"http://{pendingService.K8sClusterIP}:{primaryPort}", + Health = 1, + Weight = 100, + Status = 1, + CreatedTime = DateTime.UtcNow, + Version = 1 + }; + + db.ServiceInstances.Add(newInstance); + + pendingService.Status = (int)PendingServiceStatus.Approved; + pendingService.AssignedClusterId = request.ClusterId; + pendingService.AssignedBy = "admin"; + pendingService.AssignedAt = DateTime.UtcNow; + pendingService.Version++; + + await db.SaveChangesAsync(); + + _logger.LogInformation("Service {ServiceName} assigned to cluster {ClusterId} by admin", + pendingService.K8sServiceName, request.ClusterId); + + return Ok(new + { + success = true, + message = $"Service '{pendingService.K8sServiceName}' assigned to cluster '{request.ClusterId}'", + instanceId = newInstance.Id + }); + } + + [HttpPost("{id}/reject")] + public async Task RejectService(long id) + { + await using var db = _dbContextFactory.CreateDbContext(); + + var pendingService = await db.PendingServiceDiscoveries.FindAsync(id); + if (pendingService == null || pendingService.IsDeleted) + { + return NotFound(new { message = "Pending service not found" }); + } + + if (pendingService.Status != (int)PendingServiceStatus.Pending) + { + return BadRequest(new { message = $"Service is already {((PendingServiceStatus)pendingService.Status)}, cannot reject" }); + } + + pendingService.Status = (int)PendingServiceStatus.Rejected; + pendingService.AssignedBy = "admin"; + pendingService.AssignedAt = DateTime.UtcNow; + pendingService.Version++; + + await db.SaveChangesAsync(); + + _logger.LogInformation("Service {ServiceName} rejected by admin", pendingService.K8sServiceName); + + return Ok(new { success = true, message = $"Service '{pendingService.K8sServiceName}' rejected" }); + } + + [HttpGet("clusters")] + public async Task GetClusters() + { + await using var db = _dbContextFactory.CreateDbContext(); + + var clusters = await db.ServiceInstances + .Where(i => !i.IsDeleted) + .GroupBy(i => i.ClusterId) + .Select(g => new + { + ClusterId = g.Key, + InstanceCount = g.Count(), + HealthyCount = g.Count(i => i.Health == 1) + }) + .ToListAsync(); + + return Ok(clusters); + } +} + +public class AssignServiceRequest +{ + public string ClusterId { get; set; } = string.Empty; +} diff --git a/Data/GatewayDbContext.cs b/Data/GatewayDbContext.cs index 9ebe833..6b0cb1e 100644 --- a/Data/GatewayDbContext.cs +++ b/Data/GatewayDbContext.cs @@ -15,6 +15,7 @@ public class GatewayDbContext : DbContext public DbSet Tenants => Set(); public DbSet TenantRoutes => Set(); public DbSet ServiceInstances => Set(); + public DbSet PendingServiceDiscoveries => Set(); public override int SaveChanges(bool acceptAllChangesOnSuccess) { @@ -120,6 +121,21 @@ public class GatewayDbContext : DbContext entity.HasIndex(e => e.Health); }); + modelBuilder.Entity(entity => + { + entity.HasKey(e => e.Id); + entity.Property(e => e.K8sServiceName).HasMaxLength(255).IsRequired(); + entity.Property(e => e.K8sNamespace).HasMaxLength(255).IsRequired(); + entity.Property(e => e.K8sClusterIP).HasMaxLength(50); + entity.Property(e => e.DiscoveredPorts).HasMaxLength(500); + entity.Property(e => e.Labels).HasMaxLength(2000); + entity.Property(e => e.AssignedClusterId).HasMaxLength(100); + entity.Property(e => e.AssignedBy).HasMaxLength(100); + entity.HasIndex(e => new { e.K8sServiceName, e.K8sNamespace, e.IsDeleted }).IsUnique(); + entity.HasIndex(e => e.Status); + entity.HasIndex(e => e.DiscoveredAt); + }); + base.OnModelCreating(modelBuilder); } } diff --git a/Dockerfile b/Dockerfile index 6880cd5..3442a56 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM mcr.microsoft.com/dotnet/aspnet:10.0 AS base +FROM mcr.microsoft.com/dotnet/aspnet:10.0 AS base USER $APP_UID WORKDIR /app EXPOSE 8080 @@ -7,6 +7,9 @@ EXPOSE 8081 FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build ARG BUILD_CONFIGURATION=Release WORKDIR /src +# Copy Directory.Packages.props for centralized version management +COPY ["Directory.Packages.props", "./"] +COPY ["src/Directory.Packages.props", "src/"] COPY ["src/YarpGateway/YarpGateway.csproj", "src/YarpGateway/"] COPY ["src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Core/Fengling.ServiceDiscovery.Core.csproj", "src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Core/"] COPY ["src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Kubernetes/Fengling.ServiceDiscovery.Kubernetes.csproj", "src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Kubernetes/"] diff --git a/Migrations/20260222134342_AddPendingServiceDiscovery.Designer.cs b/Migrations/20260222134342_AddPendingServiceDiscovery.Designer.cs new file mode 100644 index 0000000..da40030 --- /dev/null +++ b/Migrations/20260222134342_AddPendingServiceDiscovery.Designer.cs @@ -0,0 +1,275 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using YarpGateway.Data; + +#nullable disable + +namespace YarpGateway.Migrations +{ + [DbContext(typeof(GatewayDbContext))] + [Migration("20260222134342_AddPendingServiceDiscovery")] + partial class AddPendingServiceDiscovery + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "10.0.2") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("YarpGateway.Models.GwPendingServiceDiscovery", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("AssignedAt") + .HasColumnType("timestamp with time zone"); + + b.Property("AssignedBy") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("AssignedClusterId") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("DiscoveredAt") + .HasColumnType("timestamp with time zone"); + + b.Property("DiscoveredPorts") + .IsRequired() + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("IsDeleted") + .HasColumnType("boolean"); + + b.Property("K8sClusterIP") + .HasMaxLength(50) + .HasColumnType("character varying(50)"); + + b.Property("K8sNamespace") + .IsRequired() + .HasMaxLength(255) + .HasColumnType("character varying(255)"); + + b.Property("K8sServiceName") + .IsRequired() + .HasMaxLength(255) + .HasColumnType("character varying(255)"); + + b.Property("Labels") + .IsRequired() + .HasMaxLength(2000) + .HasColumnType("character varying(2000)"); + + b.Property("PodCount") + .HasColumnType("integer"); + + b.Property("Status") + .HasColumnType("integer"); + + b.Property("Version") + .HasColumnType("integer"); + + b.HasKey("Id"); + + b.HasIndex("DiscoveredAt"); + + b.HasIndex("Status"); + + b.HasIndex("K8sServiceName", "K8sNamespace", "IsDeleted") + .IsUnique(); + + b.ToTable("PendingServiceDiscoveries"); + }); + + modelBuilder.Entity("YarpGateway.Models.GwServiceInstance", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("Address") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("ClusterId") + .IsRequired() + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("CreatedBy") + .HasColumnType("bigint"); + + b.Property("CreatedTime") + .HasColumnType("timestamp with time zone"); + + b.Property("DestinationId") + .IsRequired() + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("Health") + .HasColumnType("integer"); + + b.Property("IsDeleted") + .HasColumnType("boolean"); + + b.Property("Status") + .HasColumnType("integer"); + + b.Property("UpdatedBy") + .HasColumnType("bigint"); + + b.Property("UpdatedTime") + .HasColumnType("timestamp with time zone"); + + b.Property("Version") + .HasColumnType("integer"); + + b.Property("Weight") + .HasColumnType("integer"); + + b.HasKey("Id"); + + b.HasIndex("Health"); + + b.HasIndex("ClusterId", "DestinationId") + .IsUnique(); + + b.ToTable("ServiceInstances"); + }); + + modelBuilder.Entity("YarpGateway.Models.GwTenant", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("CreatedBy") + .HasColumnType("bigint"); + + b.Property("CreatedTime") + .HasColumnType("timestamp with time zone"); + + b.Property("IsDeleted") + .HasColumnType("boolean"); + + b.Property("Status") + .HasColumnType("integer"); + + b.Property("TenantCode") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("character varying(50)"); + + b.Property("TenantName") + .IsRequired() + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("UpdatedBy") + .HasColumnType("bigint"); + + b.Property("UpdatedTime") + .HasColumnType("timestamp with time zone"); + + b.Property("Version") + .HasColumnType("integer"); + + b.HasKey("Id"); + + b.HasIndex("TenantCode") + .IsUnique(); + + b.ToTable("Tenants"); + }); + + modelBuilder.Entity("YarpGateway.Models.GwTenantRoute", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("ClusterId") + .IsRequired() + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("CreatedBy") + .HasColumnType("bigint"); + + b.Property("CreatedTime") + .HasColumnType("timestamp with time zone"); + + b.Property("IsDeleted") + .HasColumnType("boolean"); + + b.Property("IsGlobal") + .HasColumnType("boolean"); + + b.Property("PathPattern") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("character varying(200)"); + + b.Property("Priority") + .HasColumnType("integer"); + + b.Property("ServiceName") + .IsRequired() + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("Status") + .HasColumnType("integer"); + + b.Property("TenantCode") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("character varying(50)"); + + b.Property("UpdatedBy") + .HasColumnType("bigint"); + + b.Property("UpdatedTime") + .HasColumnType("timestamp with time zone"); + + b.Property("Version") + .HasColumnType("integer"); + + b.HasKey("Id"); + + b.HasIndex("ClusterId"); + + b.HasIndex("ServiceName"); + + b.HasIndex("TenantCode"); + + b.HasIndex("ServiceName", "IsGlobal", "Status"); + + b.ToTable("TenantRoutes"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/Migrations/20260222134342_AddPendingServiceDiscovery.cs b/Migrations/20260222134342_AddPendingServiceDiscovery.cs new file mode 100644 index 0000000..d891dea --- /dev/null +++ b/Migrations/20260222134342_AddPendingServiceDiscovery.cs @@ -0,0 +1,64 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +namespace YarpGateway.Migrations +{ + /// + public partial class AddPendingServiceDiscovery : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.CreateTable( + name: "PendingServiceDiscoveries", + columns: table => new + { + Id = table.Column(type: "bigint", nullable: false) + .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn), + K8sServiceName = table.Column(type: "character varying(255)", maxLength: 255, nullable: false), + K8sNamespace = table.Column(type: "character varying(255)", maxLength: 255, nullable: false), + K8sClusterIP = table.Column(type: "character varying(50)", maxLength: 50, nullable: true), + DiscoveredPorts = table.Column(type: "character varying(500)", maxLength: 500, nullable: false), + Labels = table.Column(type: "character varying(2000)", maxLength: 2000, nullable: false), + PodCount = table.Column(type: "integer", nullable: false), + Status = table.Column(type: "integer", nullable: false), + AssignedClusterId = table.Column(type: "character varying(100)", maxLength: 100, nullable: true), + AssignedBy = table.Column(type: "character varying(100)", maxLength: 100, nullable: true), + AssignedAt = table.Column(type: "timestamp with time zone", nullable: true), + DiscoveredAt = table.Column(type: "timestamp with time zone", nullable: false), + IsDeleted = table.Column(type: "boolean", nullable: false), + Version = table.Column(type: "integer", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_PendingServiceDiscoveries", x => x.Id); + }); + + migrationBuilder.CreateIndex( + name: "IX_PendingServiceDiscoveries_DiscoveredAt", + table: "PendingServiceDiscoveries", + column: "DiscoveredAt"); + + migrationBuilder.CreateIndex( + name: "IX_PendingServiceDiscoveries_K8sServiceName_K8sNamespace_IsDel~", + table: "PendingServiceDiscoveries", + columns: new[] { "K8sServiceName", "K8sNamespace", "IsDeleted" }, + unique: true); + + migrationBuilder.CreateIndex( + name: "IX_PendingServiceDiscoveries_Status", + table: "PendingServiceDiscoveries", + column: "Status"); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "PendingServiceDiscoveries"); + } + } +} diff --git a/Migrations/GatewayDbContextModelSnapshot.cs b/Migrations/GatewayDbContextModelSnapshot.cs index 9ff7b99..623cbaa 100644 --- a/Migrations/GatewayDbContextModelSnapshot.cs +++ b/Migrations/GatewayDbContextModelSnapshot.cs @@ -17,11 +17,81 @@ namespace YarpGateway.Migrations { #pragma warning disable 612, 618 modelBuilder - .HasAnnotation("ProductVersion", "9.0.0") + .HasAnnotation("ProductVersion", "10.0.2") .HasAnnotation("Relational:MaxIdentifierLength", 63); NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + modelBuilder.Entity("YarpGateway.Models.GwPendingServiceDiscovery", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("bigint"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("AssignedAt") + .HasColumnType("timestamp with time zone"); + + b.Property("AssignedBy") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("AssignedClusterId") + .HasMaxLength(100) + .HasColumnType("character varying(100)"); + + b.Property("DiscoveredAt") + .HasColumnType("timestamp with time zone"); + + b.Property("DiscoveredPorts") + .IsRequired() + .HasMaxLength(500) + .HasColumnType("character varying(500)"); + + b.Property("IsDeleted") + .HasColumnType("boolean"); + + b.Property("K8sClusterIP") + .HasMaxLength(50) + .HasColumnType("character varying(50)"); + + b.Property("K8sNamespace") + .IsRequired() + .HasMaxLength(255) + .HasColumnType("character varying(255)"); + + b.Property("K8sServiceName") + .IsRequired() + .HasMaxLength(255) + .HasColumnType("character varying(255)"); + + b.Property("Labels") + .IsRequired() + .HasMaxLength(2000) + .HasColumnType("character varying(2000)"); + + b.Property("PodCount") + .HasColumnType("integer"); + + b.Property("Status") + .HasColumnType("integer"); + + b.Property("Version") + .HasColumnType("integer"); + + b.HasKey("Id"); + + b.HasIndex("DiscoveredAt"); + + b.HasIndex("Status"); + + b.HasIndex("K8sServiceName", "K8sNamespace", "IsDeleted") + .IsUnique(); + + b.ToTable("PendingServiceDiscoveries"); + }); + modelBuilder.Entity("YarpGateway.Models.GwServiceInstance", b => { b.Property("Id") diff --git a/Migrations/pending_service_migration.sql b/Migrations/pending_service_migration.sql new file mode 100644 index 0000000..b5097ad --- /dev/null +++ b/Migrations/pending_service_migration.sql @@ -0,0 +1,122 @@ +CREATE TABLE IF NOT EXISTS "__EFMigrationsHistory" ( + "MigrationId" character varying(150) NOT NULL, + "ProductVersion" character varying(32) NOT NULL, + CONSTRAINT "PK___EFMigrationsHistory" PRIMARY KEY ("MigrationId") +); + +START TRANSACTION; +CREATE TABLE "ServiceInstances" ( + "Id" bigint GENERATED BY DEFAULT AS IDENTITY, + "ClusterId" character varying(100) NOT NULL, + "DestinationId" character varying(100) NOT NULL, + "Address" character varying(200) NOT NULL, + "Health" integer NOT NULL, + "Weight" integer NOT NULL, + "Status" integer NOT NULL, + "CreatedBy" bigint, + "CreatedTime" timestamp with time zone NOT NULL, + "UpdatedBy" bigint, + "UpdatedTime" timestamp with time zone, + "IsDeleted" boolean NOT NULL, + "Version" integer NOT NULL, + CONSTRAINT "PK_ServiceInstances" PRIMARY KEY ("Id") +); + +CREATE TABLE "Tenants" ( + "Id" bigint GENERATED BY DEFAULT AS IDENTITY, + "TenantCode" character varying(50) NOT NULL, + "TenantName" character varying(100) NOT NULL, + "Status" integer NOT NULL, + "CreatedBy" bigint, + "CreatedTime" timestamp with time zone NOT NULL, + "UpdatedBy" bigint, + "UpdatedTime" timestamp with time zone, + "IsDeleted" boolean NOT NULL, + "Version" integer NOT NULL, + CONSTRAINT "PK_Tenants" PRIMARY KEY ("Id"), + CONSTRAINT "AK_Tenants_TenantCode" UNIQUE ("TenantCode") +); + +CREATE TABLE "TenantRoutes" ( + "Id" bigint GENERATED BY DEFAULT AS IDENTITY, + "TenantCode" character varying(50) NOT NULL, + "ServiceName" character varying(100) NOT NULL, + "ClusterId" character varying(100) NOT NULL, + "PathPattern" character varying(200) NOT NULL, + "Priority" integer NOT NULL, + "Status" integer NOT NULL, + "CreatedBy" bigint, + "CreatedTime" timestamp with time zone NOT NULL, + "UpdatedBy" bigint, + "UpdatedTime" timestamp with time zone, + "IsDeleted" boolean NOT NULL, + "Version" integer NOT NULL, + CONSTRAINT "PK_TenantRoutes" PRIMARY KEY ("Id"), + CONSTRAINT "FK_TenantRoutes_Tenants_TenantCode" FOREIGN KEY ("TenantCode") REFERENCES "Tenants" ("TenantCode") ON DELETE RESTRICT +); + +CREATE UNIQUE INDEX "IX_ServiceInstances_ClusterId_DestinationId" ON "ServiceInstances" ("ClusterId", "DestinationId"); + +CREATE INDEX "IX_ServiceInstances_Health" ON "ServiceInstances" ("Health"); + +CREATE INDEX "IX_TenantRoutes_ClusterId" ON "TenantRoutes" ("ClusterId"); + +CREATE UNIQUE INDEX "IX_TenantRoutes_TenantCode_ServiceName" ON "TenantRoutes" ("TenantCode", "ServiceName"); + +CREATE UNIQUE INDEX "IX_Tenants_TenantCode" ON "Tenants" ("TenantCode"); + +INSERT INTO "__EFMigrationsHistory" ("MigrationId", "ProductVersion") +VALUES ('20260201120312_InitialCreate', '10.0.2'); + +COMMIT; + +START TRANSACTION; +ALTER TABLE "TenantRoutes" DROP CONSTRAINT "FK_TenantRoutes_Tenants_TenantCode"; + +ALTER TABLE "Tenants" DROP CONSTRAINT "AK_Tenants_TenantCode"; + +DROP INDEX "IX_TenantRoutes_TenantCode_ServiceName"; + +ALTER TABLE "TenantRoutes" ADD "IsGlobal" boolean NOT NULL DEFAULT FALSE; + +CREATE INDEX "IX_TenantRoutes_ServiceName" ON "TenantRoutes" ("ServiceName"); + +CREATE INDEX "IX_TenantRoutes_ServiceName_IsGlobal_Status" ON "TenantRoutes" ("ServiceName", "IsGlobal", "Status"); + +CREATE INDEX "IX_TenantRoutes_TenantCode" ON "TenantRoutes" ("TenantCode"); + +INSERT INTO "__EFMigrationsHistory" ("MigrationId", "ProductVersion") +VALUES ('20260201133826_AddIsGlobalToTenantRoute', '10.0.2'); + +COMMIT; + +START TRANSACTION; +CREATE TABLE "PendingServiceDiscoveries" ( + "Id" bigint GENERATED BY DEFAULT AS IDENTITY, + "K8sServiceName" character varying(255) NOT NULL, + "K8sNamespace" character varying(255) NOT NULL, + "K8sClusterIP" character varying(50), + "DiscoveredPorts" character varying(500) NOT NULL, + "Labels" character varying(2000) NOT NULL, + "PodCount" integer NOT NULL, + "Status" integer NOT NULL, + "AssignedClusterId" character varying(100), + "AssignedBy" character varying(100), + "AssignedAt" timestamp with time zone, + "DiscoveredAt" timestamp with time zone NOT NULL, + "IsDeleted" boolean NOT NULL, + "Version" integer NOT NULL, + CONSTRAINT "PK_PendingServiceDiscoveries" PRIMARY KEY ("Id") +); + +CREATE INDEX "IX_PendingServiceDiscoveries_DiscoveredAt" ON "PendingServiceDiscoveries" ("DiscoveredAt"); + +CREATE UNIQUE INDEX "IX_PendingServiceDiscoveries_K8sServiceName_K8sNamespace_IsDel~" ON "PendingServiceDiscoveries" ("K8sServiceName", "K8sNamespace", "IsDeleted"); + +CREATE INDEX "IX_PendingServiceDiscoveries_Status" ON "PendingServiceDiscoveries" ("Status"); + +INSERT INTO "__EFMigrationsHistory" ("MigrationId", "ProductVersion") +VALUES ('20260222134342_AddPendingServiceDiscovery', '10.0.2'); + +COMMIT; + diff --git a/Models/GwPendingServiceDiscovery.cs b/Models/GwPendingServiceDiscovery.cs new file mode 100644 index 0000000..e9da692 --- /dev/null +++ b/Models/GwPendingServiceDiscovery.cs @@ -0,0 +1,27 @@ +namespace YarpGateway.Models; + +public class GwPendingServiceDiscovery +{ + public long Id { get; set; } + public string K8sServiceName { get; set; } = string.Empty; + public string K8sNamespace { get; set; } = string.Empty; + public string? K8sClusterIP { get; set; } + public string DiscoveredPorts { get; set; } = "[]"; + public string Labels { get; set; } = "{}"; + public int PodCount { get; set; } = 0; + public int Status { get; set; } = 0; + public string? AssignedClusterId { get; set; } + public string? AssignedBy { get; set; } + public DateTime? AssignedAt { get; set; } + public DateTime DiscoveredAt { get; set; } = DateTime.UtcNow; + public bool IsDeleted { get; set; } = false; + public int Version { get; set; } = 0; +} + +public enum PendingServiceStatus +{ + Pending = 0, + Approved = 1, + Rejected = 2, + K8sServiceNotFound = 3 +} diff --git a/Program.cs b/Program.cs index 861e0d9..974adfa 100644 --- a/Program.cs +++ b/Program.cs @@ -67,14 +67,17 @@ builder.Services.AddSingleton(sp => sp.GetRequiredService< builder.Services.AddHostedService(); // 添加 Kubernetes 服务发现 +var useInClusterConfig = builder.Configuration.GetValue("ServiceDiscovery:UseInClusterConfig", true); builder.Services.AddKubernetesServiceDiscovery(options => { options.LabelSelector = "app.kubernetes.io/managed-by=yarp"; - options.UseInClusterConfig = false; // 本地调试设为 false,生产环境设为 true + options.UseInClusterConfig = useInClusterConfig; }); builder.Services.AddServiceDiscovery(); +builder.Services.AddHostedService(); + var corsSettings = builder.Configuration.GetSection("Cors"); builder.Services.AddCors(options => { @@ -109,6 +112,8 @@ app.UseCors("AllowFrontend"); app.UseMiddleware(); app.UseMiddleware(); +app.MapGet("/health", () => Results.Ok(new { status = "healthy", timestamp = DateTime.UtcNow })); + app.MapControllers(); app.MapReverseProxy(); diff --git a/Services/KubernetesPendingSyncService.cs b/Services/KubernetesPendingSyncService.cs new file mode 100644 index 0000000..60fa135 --- /dev/null +++ b/Services/KubernetesPendingSyncService.cs @@ -0,0 +1,161 @@ +using System.Text.Json; +using Microsoft.EntityFrameworkCore; +using YarpGateway.Data; +using YarpGateway.Models; + +namespace YarpGateway.Services; + +public class KubernetesPendingSyncService : BackgroundService +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + private readonly TimeSpan _syncInterval = TimeSpan.FromSeconds(30); + private readonly TimeSpan _staleThreshold = TimeSpan.FromHours(24); + + public KubernetesPendingSyncService( + IServiceProvider serviceProvider, + ILogger logger) + { + _serviceProvider = serviceProvider; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("Starting K8s pending service sync background task"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await SyncPendingServicesAsync(stoppingToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during K8s pending service sync"); + } + + await Task.Delay(_syncInterval, stoppingToken); + } + } + + private async Task SyncPendingServicesAsync(CancellationToken ct) + { + using var scope = _serviceProvider.CreateScope(); + var providers = scope.ServiceProvider.GetServices(); + var k8sProvider = providers.FirstOrDefault(p => p.ProviderName == "Kubernetes"); + + if (k8sProvider == null) + { + _logger.LogWarning("No Kubernetes service discovery provider found"); + return; + } + + var dbContextFactory = scope.ServiceProvider.GetRequiredService>(); + + var discoveredServices = await k8sProvider.GetServicesAsync(ct); + + await using var db = await dbContextFactory.CreateDbContextAsync(ct); + + var existingPending = await db.PendingServiceDiscoveries + .Where(p => !p.IsDeleted && p.Status == (int)PendingServiceStatus.Pending) + .ToListAsync(ct); + + var existingDict = existingPending + .ToDictionary(p => $"{p.K8sServiceName}|{p.K8sNamespace}"); + + var discoveredSet = discoveredServices + .Select(s => $"{s.Name}|{s.Namespace}") + .ToHashSet(); + + var addedCount = 0; + var updatedCount = 0; + var cleanedCount = 0; + + foreach (var item in existingDict) + { + var key = item.Key; + + if (!discoveredSet.Contains(key)) + { + var pending = item.Value; + + if (DateTime.UtcNow - pending.DiscoveredAt > _staleThreshold) + { + pending.IsDeleted = true; + pending.Version++; + cleanedCount++; + _logger.LogInformation("Cleaned up stale pending service {ServiceName} in namespace {Namespace}", + pending.K8sServiceName, pending.K8sNamespace); + } + else + { + pending.Status = (int)PendingServiceStatus.K8sServiceNotFound; + pending.Version++; + _logger.LogInformation("Pending service {ServiceName} in namespace {Namespace} not found in K8s, marked as not found", + pending.K8sServiceName, pending.K8sNamespace); + } + } + } + + if (discoveredServices.Count > 0) + { + var discoveredDict = discoveredServices.ToDictionary( + s => $"{s.Name}|{s.Namespace}", + s => s); + + foreach (var item in discoveredDict) + { + var key = item.Key; + var service = item.Value; + + if (existingDict.TryGetValue(key, out var existing)) + { + if (existing.Status == (int)PendingServiceStatus.K8sServiceNotFound) + { + existing.Status = (int)PendingServiceStatus.Pending; + existing.Version++; + updatedCount++; + } + + var portsJson = JsonSerializer.Serialize(service.Ports); + var labelsJson = JsonSerializer.Serialize(service.Labels); + + if (existing.DiscoveredPorts != portsJson || existing.Labels != labelsJson) + { + existing.DiscoveredPorts = portsJson; + existing.Labels = labelsJson; + existing.K8sClusterIP = service.ClusterIP; + existing.PodCount = service.Ports.Count; + existing.Version++; + updatedCount++; + } + } + else + { + var newPending = new GwPendingServiceDiscovery + { + K8sServiceName = service.Name, + K8sNamespace = service.Namespace, + K8sClusterIP = service.ClusterIP, + DiscoveredPorts = JsonSerializer.Serialize(service.Ports), + Labels = JsonSerializer.Serialize(service.Labels), + PodCount = service.Ports.Count, + Status = (int)PendingServiceStatus.Pending, + DiscoveredAt = DateTime.UtcNow, + Version = 1 + }; + db.PendingServiceDiscoveries.Add(newPending); + addedCount++; + } + } + } + + if (addedCount > 0 || updatedCount > 0 || cleanedCount > 0) + { + await db.SaveChangesAsync(ct); + _logger.LogInformation("K8s sync completed: {Added} new, {Updated} updated, {Cleaned} cleaned", + addedCount, updatedCount, cleanedCount); + } + } +}