Compare commits

..

No commits in common. "abe3456ccb696025d10b5201aa06369d14320e07" and "a3be629bceb2e86138fb61f61dc17f47c908f765" have entirely different histories.

166 changed files with 5579 additions and 1632 deletions

View File

@ -1,6 +0,0 @@
namespace YarpGateway.Config;
public static class ConfigNotifyChannel
{
public const string GatewayConfigChanged = "gateway_config_changed";
}

View File

@ -2,7 +2,7 @@ namespace YarpGateway.Config;
public class RedisConfig
{
public string ConnectionString { get; set; } = "81.68.223.70:16379,password=sl52788542";
public string ConnectionString { get; set; } = "localhost:6379";
public int Database { get; set; } = 0;
public string InstanceName { get; set; } = "YarpGateway";
}

View File

@ -4,6 +4,7 @@ using YarpGateway.Data;
using YarpGateway.Config;
using YarpGateway.Models;
using YarpGateway.Services;
using Yarp.ReverseProxy.Configuration;
namespace YarpGateway.Controllers;
@ -28,63 +29,33 @@ public class GatewayConfigController : ControllerBase
_routeCache = routeCache;
}
#region Tenants
[HttpGet("tenants")]
public async Task<IActionResult> GetTenants([FromQuery] int page = 1, [FromQuery] int pageSize = 10, [FromQuery] string? keyword = null)
public async Task<IActionResult> GetTenants()
{
await using var db = _dbContextFactory.CreateDbContext();
var query = db.Tenants.Where(t => !t.IsDeleted);
if (!string.IsNullOrEmpty(keyword))
{
query = query.Where(t => t.TenantCode.Contains(keyword) || t.TenantName.Contains(keyword));
}
var total = await query.CountAsync();
var items = await query
.OrderByDescending(t => t.Id)
.Skip((page - 1) * pageSize)
.Take(pageSize)
.Select(t => new
{
t.Id,
t.TenantCode,
t.TenantName,
t.Status,
RouteCount = db.TenantRoutes.Count(r => r.TenantCode == t.TenantCode && !r.IsDeleted),
t.Version,
t.CreatedTime,
t.UpdatedTime
})
var tenants = await db.Tenants
.Where(t => !t.IsDeleted)
.ToListAsync();
return Ok(new { items, total, page, pageSize, totalPages = (int)Math.Ceiling(total / (double)pageSize) });
}
[HttpGet("tenants/{id}")]
public async Task<IActionResult> GetTenant(long id)
{
await using var db = _dbContextFactory.CreateDbContext();
var tenant = await db.Tenants.FindAsync(id);
if (tenant == null) return NotFound();
return Ok(tenant);
return Ok(tenants);
}
[HttpPost("tenants")]
public async Task<IActionResult> CreateTenant([FromBody] CreateTenantDto dto)
{
await using var db = _dbContextFactory.CreateDbContext();
var existing = await db.Tenants.FirstOrDefaultAsync(t => t.TenantCode == dto.TenantCode);
if (existing != null) return BadRequest($"Tenant code {dto.TenantCode} already exists");
var existing = await db.Tenants
.FirstOrDefaultAsync(t => t.TenantCode == dto.TenantCode);
if (existing != null)
{
return BadRequest($"Tenant code {dto.TenantCode} already exists");
}
var tenant = new GwTenant
{
Id = GenerateId(),
TenantCode = dto.TenantCode,
TenantName = dto.TenantName,
Status = 1,
Version = 1
Status = 1
};
await db.Tenants.AddAsync(tenant);
await db.SaveChangesAsync();
@ -92,110 +63,55 @@ public class GatewayConfigController : ControllerBase
return Ok(tenant);
}
[HttpPut("tenants/{id}")]
public async Task<IActionResult> UpdateTenant(long id, [FromBody] UpdateTenantDto dto)
{
await using var db = _dbContextFactory.CreateDbContext();
var tenant = await db.Tenants.FindAsync(id);
if (tenant == null) return NotFound();
if (!string.IsNullOrEmpty(dto.TenantName)) tenant.TenantName = dto.TenantName;
if (dto.Status != null) tenant.Status = dto.Status.Value;
tenant.Version++;
tenant.UpdatedTime = DateTime.UtcNow;
await db.SaveChangesAsync();
return Ok(tenant);
}
[HttpDelete("tenants/{id}")]
public async Task<IActionResult> DeleteTenant(long id)
{
await using var db = _dbContextFactory.CreateDbContext();
var tenant = await db.Tenants.FindAsync(id);
if (tenant == null) return NotFound();
if (tenant == null)
return NotFound();
tenant.IsDeleted = true;
tenant.UpdatedTime = DateTime.UtcNow;
await db.SaveChangesAsync();
return Ok();
}
#endregion
#region Routes
[HttpGet("routes")]
public async Task<IActionResult> GetRoutes([FromQuery] int page = 1, [FromQuery] int pageSize = 10, [FromQuery] string? tenantCode = null, [FromQuery] bool? isGlobal = null)
{
await using var db = _dbContextFactory.CreateDbContext();
var query = db.TenantRoutes.Where(r => !r.IsDeleted);
if (!string.IsNullOrEmpty(tenantCode))
query = query.Where(r => r.TenantCode == tenantCode);
if (isGlobal != null)
query = query.Where(r => r.IsGlobal == isGlobal.Value);
var total = await query.CountAsync();
var items = await query
.OrderBy(r => r.Priority)
.Skip((page - 1) * pageSize)
.Take(pageSize)
.ToListAsync();
return Ok(new { items, total, page, pageSize, totalPages = (int)Math.Ceiling(total / (double)pageSize) });
}
[HttpGet("routes/global")]
public async Task<IActionResult> GetGlobalRoutes()
{
await using var db = _dbContextFactory.CreateDbContext();
var routes = await db.TenantRoutes.Where(r => r.IsGlobal && !r.IsDeleted).ToListAsync();
return Ok(routes);
}
[HttpGet("routes/tenant/{tenantCode}")]
[HttpGet("tenants/{tenantCode}/routes")]
public async Task<IActionResult> GetTenantRoutes(string tenantCode)
{
await using var db = _dbContextFactory.CreateDbContext();
var routes = await db.TenantRoutes.Where(r => r.TenantCode == tenantCode && !r.IsDeleted).ToListAsync();
var routes = await db.TenantRoutes
.Where(r => r.TenantCode == tenantCode && !r.IsDeleted)
.ToListAsync();
return Ok(routes);
}
[HttpGet("routes/{id}")]
public async Task<IActionResult> GetRoute(long id)
[HttpPost("tenants/{tenantCode}/routes")]
public async Task<IActionResult> CreateTenantRoute(string tenantCode, [FromBody] CreateTenantRouteDto dto)
{
await using var db = _dbContextFactory.CreateDbContext();
var route = await db.TenantRoutes.FindAsync(id);
if (route == null) return NotFound();
return Ok(route);
}
var tenant = await db.Tenants
.FirstOrDefaultAsync(t => t.TenantCode == tenantCode);
if (tenant == null)
return BadRequest($"Tenant {tenantCode} not found");
[HttpPost("routes")]
public async Task<IActionResult> CreateRoute([FromBody] CreateRouteDto dto)
{
await using var db = _dbContextFactory.CreateDbContext();
if ((dto.IsGlobal != true) && !string.IsNullOrEmpty(dto.TenantCode))
{
var tenant = await db.Tenants.FirstOrDefaultAsync(t => t.TenantCode == dto.TenantCode);
if (tenant == null) return BadRequest($"Tenant {dto.TenantCode} not found");
}
var clusterId = $"{tenantCode}-{dto.ServiceName}";
var existing = await db.TenantRoutes
.FirstOrDefaultAsync(r => r.ClusterId == clusterId);
if (existing != null)
return BadRequest($"Route for {tenantCode}/{dto.ServiceName} already exists");
var route = new GwTenantRoute
{
Id = GenerateId(),
TenantCode = dto.TenantCode ?? string.Empty,
TenantCode = tenantCode,
ServiceName = dto.ServiceName,
ClusterId = dto.ClusterId,
ClusterId = clusterId,
PathPattern = dto.PathPattern,
Priority = dto.Priority ?? 10,
Priority = 10,
Status = 1,
IsGlobal = dto.IsGlobal ?? false,
Version = 1,
CreatedTime = DateTime.UtcNow
IsGlobal = false
};
await db.TenantRoutes.AddAsync(route);
await db.SaveChangesAsync();
@ -205,21 +121,41 @@ public class GatewayConfigController : ControllerBase
return Ok(route);
}
[HttpPut("routes/{id}")]
public async Task<IActionResult> UpdateRoute(long id, [FromBody] CreateRouteDto dto)
[HttpGet("routes/global")]
public async Task<IActionResult> GetGlobalRoutes()
{
await using var db = _dbContextFactory.CreateDbContext();
var route = await db.TenantRoutes.FindAsync(id);
if (route == null) return NotFound();
var routes = await db.TenantRoutes
.Where(r => r.IsGlobal && !r.IsDeleted)
.ToListAsync();
return Ok(routes);
}
route.ServiceName = dto.ServiceName;
route.ClusterId = dto.ClusterId;
route.PathPattern = dto.PathPattern;
if (dto.Priority != null) route.Priority = dto.Priority.Value;
route.Version++;
route.UpdatedTime = DateTime.UtcNow;
[HttpPost("routes/global")]
public async Task<IActionResult> CreateGlobalRoute([FromBody] CreateGlobalRouteDto dto)
{
await using var db = _dbContextFactory.CreateDbContext();
var existing = await db.TenantRoutes
.FirstOrDefaultAsync(r => r.ServiceName == dto.ServiceName && r.IsGlobal);
if (existing != null)
{
return BadRequest($"Global route for {dto.ServiceName} already exists");
}
var route = new GwTenantRoute
{
Id = GenerateId(),
TenantCode = string.Empty,
ServiceName = dto.ServiceName,
ClusterId = dto.ClusterId,
PathPattern = dto.PathPattern,
Priority = 0,
Status = 1,
IsGlobal = true
};
await db.TenantRoutes.AddAsync(route);
await db.SaveChangesAsync();
await _routeCache.ReloadAsync();
return Ok(route);
@ -230,10 +166,10 @@ public class GatewayConfigController : ControllerBase
{
await using var db = _dbContextFactory.CreateDbContext();
var route = await db.TenantRoutes.FindAsync(id);
if (route == null) return NotFound();
if (route == null)
return NotFound();
route.IsDeleted = true;
route.UpdatedTime = DateTime.UtcNow;
await db.SaveChangesAsync();
await _routeCache.ReloadAsync();
@ -241,94 +177,24 @@ public class GatewayConfigController : ControllerBase
return Ok();
}
#endregion
#region Clusters
[HttpGet("clusters")]
public async Task<IActionResult> GetClusters()
{
await using var db = _dbContextFactory.CreateDbContext();
var clusters = await db.ServiceInstances
.Where(i => !i.IsDeleted)
.GroupBy(i => i.ClusterId)
.Select(g => new
{
ClusterId = g.Key,
ClusterName = g.Key,
InstanceCount = g.Count(),
HealthyInstanceCount = g.Count(i => i.Health == 1),
Instances = g.ToList()
})
.ToListAsync();
return Ok(clusters);
}
[HttpGet("clusters/{clusterId}")]
public async Task<IActionResult> GetCluster(string clusterId)
{
await using var db = _dbContextFactory.CreateDbContext();
var instances = await db.ServiceInstances.Where(i => i.ClusterId == clusterId && !i.IsDeleted).ToListAsync();
if (!instances.Any()) return NotFound();
return Ok(new
{
ClusterId = clusterId,
ClusterName = clusterId,
InstanceCount = instances.Count,
HealthyInstanceCount = instances.Count(i => i.Health == 1),
Instances = instances
});
}
[HttpPost("clusters")]
public async Task<IActionResult> CreateCluster([FromBody] CreateClusterDto dto)
{
return Ok(new { message = "Cluster created", clusterId = dto.ClusterId });
}
[HttpDelete("clusters/{clusterId}")]
public async Task<IActionResult> DeleteCluster(string clusterId)
{
await using var db = _dbContextFactory.CreateDbContext();
var instances = await db.ServiceInstances.Where(i => i.ClusterId == clusterId).ToListAsync();
foreach (var instance in instances)
{
instance.IsDeleted = true;
}
await db.SaveChangesAsync();
await _clusterProvider.ReloadAsync();
return Ok();
}
#endregion
#region Instances
[HttpGet("clusters/{clusterId}/instances")]
public async Task<IActionResult> GetInstances(string clusterId)
{
await using var db = _dbContextFactory.CreateDbContext();
var instances = await db.ServiceInstances.Where(i => i.ClusterId == clusterId && !i.IsDeleted).ToListAsync();
var instances = await db.ServiceInstances
.Where(i => i.ClusterId == clusterId && !i.IsDeleted)
.ToListAsync();
return Ok(instances);
}
[HttpGet("instances/{id}")]
public async Task<IActionResult> GetInstance(long id)
{
await using var db = _dbContextFactory.CreateDbContext();
var instance = await db.ServiceInstances.FindAsync(id);
if (instance == null) return NotFound();
return Ok(instance);
}
[HttpPost("clusters/{clusterId}/instances")]
public async Task<IActionResult> CreateInstance(string clusterId, [FromBody] CreateInstanceDto dto)
public async Task<IActionResult> AddInstance(string clusterId, [FromBody] CreateInstanceDto dto)
{
await using var db = _dbContextFactory.CreateDbContext();
var existing = await db.ServiceInstances.FirstOrDefaultAsync(i => i.ClusterId == clusterId && i.DestinationId == dto.DestinationId);
if (existing != null) return BadRequest($"Instance {dto.DestinationId} already exists");
var existing = await db.ServiceInstances
.FirstOrDefaultAsync(i => i.ClusterId == clusterId && i.DestinationId == dto.DestinationId);
if (existing != null)
return BadRequest($"Instance {dto.DestinationId} already exists in cluster {clusterId}");
var instance = new GwServiceInstance
{
@ -336,11 +202,9 @@ public class GatewayConfigController : ControllerBase
ClusterId = clusterId,
DestinationId = dto.DestinationId,
Address = dto.Address,
Weight = dto.Weight ?? 1,
Health = dto.IsHealthy == true ? 1 : 0,
Status = 1,
Version = 1,
CreatedTime = DateTime.UtcNow
Weight = dto.Weight,
Health = 1,
Status = 1
};
await db.ServiceInstances.AddAsync(instance);
await db.SaveChangesAsync();
@ -355,10 +219,10 @@ public class GatewayConfigController : ControllerBase
{
await using var db = _dbContextFactory.CreateDbContext();
var instance = await db.ServiceInstances.FindAsync(id);
if (instance == null) return NotFound();
if (instance == null)
return NotFound();
instance.IsDeleted = true;
instance.UpdatedTime = DateTime.UtcNow;
await db.SaveChangesAsync();
await _clusterProvider.ReloadAsync();
@ -366,123 +230,43 @@ public class GatewayConfigController : ControllerBase
return Ok();
}
#endregion
#region Config & Stats
[HttpPost("config/reload")]
[HttpPost("reload")]
public async Task<IActionResult> ReloadConfig()
{
await _routeCache.ReloadAsync();
await _routeProvider.ReloadAsync();
await _clusterProvider.ReloadAsync();
return Ok(new { message = "Config reloaded successfully", timestamp = DateTime.UtcNow });
return Ok(new { message = "Config reloaded successfully" });
}
[HttpGet("config/status")]
public async Task<IActionResult> GetConfigStatus()
private long GenerateId()
{
await using var db = _dbContextFactory.CreateDbContext();
var routeCount = await db.TenantRoutes.CountAsync(r => r.Status == 1 && !r.IsDeleted);
var instanceCount = await db.ServiceInstances.CountAsync(i => i.Status == 1 && !i.IsDeleted);
var healthyCount = await db.ServiceInstances.CountAsync(i => i.Health == 1 && !i.IsDeleted);
return Ok(new
{
routeCount,
clusterCount = await db.ServiceInstances.Where(i => !i.IsDeleted).GroupBy(i => i.ClusterId).CountAsync(),
instanceCount,
healthyInstanceCount = healthyCount,
lastReloadTime = DateTime.UtcNow,
isListening = true,
listenerStatus = "Active"
});
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
[HttpGet("config/versions")]
public async Task<IActionResult> GetVersionInfo()
{
await using var db = _dbContextFactory.CreateDbContext();
var routeVersion = await db.TenantRoutes.OrderByDescending(r => r.Version).Select(r => r.Version).FirstOrDefaultAsync();
var clusterVersion = await db.ServiceInstances.OrderByDescending(i => i.Version).Select(i => i.Version).FirstOrDefaultAsync();
return Ok(new
{
routeVersion,
clusterVersion,
routeVersionUpdatedAt = DateTime.UtcNow,
clusterVersionUpdatedAt = DateTime.UtcNow
});
}
[HttpGet("stats/overview")]
public async Task<IActionResult> GetOverviewStats()
{
await using var db = _dbContextFactory.CreateDbContext();
var totalTenants = await db.Tenants.CountAsync(t => !t.IsDeleted);
var activeTenants = await db.Tenants.CountAsync(t => !t.IsDeleted && t.Status == 1);
var totalRoutes = await db.TenantRoutes.CountAsync(r => r.Status == 1 && !r.IsDeleted);
var totalInstances = await db.ServiceInstances.CountAsync(i => i.Status == 1 && !i.IsDeleted);
var healthyInstances = await db.ServiceInstances.CountAsync(i => i.Health == 1 && !i.IsDeleted);
return Ok(new
{
totalTenants,
activeTenants,
totalRoutes,
totalClusters = await db.ServiceInstances.Where(i => !i.IsDeleted).GroupBy(i => i.ClusterId).CountAsync(),
totalInstances,
healthyInstances,
lastUpdated = DateTime.UtcNow
});
}
#endregion
#region DTOs
public class CreateTenantDto
{
public string TenantCode { get; set; } = string.Empty;
public string TenantName { get; set; } = string.Empty;
}
public class UpdateTenantDto
public class CreateTenantRouteDto
{
public string? TenantName { get; set; }
public int? Status { get; set; }
public string ServiceName { get; set; } = string.Empty;
public string PathPattern { get; set; } = string.Empty;
}
public class CreateRouteDto
public class CreateGlobalRouteDto
{
public string? TenantCode { get; set; }
public string ServiceName { get; set; } = string.Empty;
public string ClusterId { get; set; } = string.Empty;
public string PathPattern { get; set; } = string.Empty;
public int? Priority { get; set; }
public bool? IsGlobal { get; set; }
}
public class CreateClusterDto
{
public string ClusterId { get; set; } = string.Empty;
public string ClusterName { get; set; } = string.Empty;
public string? Description { get; set; }
public string? LoadBalancingPolicy { get; set; }
}
public class CreateInstanceDto
{
public string DestinationId { get; set; } = string.Empty;
public string Address { get; set; } = string.Empty;
public int? Weight { get; set; }
public bool? IsHealthy { get; set; }
}
#endregion
private long GenerateId()
{
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
public int Weight { get; set; } = 1;
}
}

View File

@ -1,209 +0,0 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using YarpGateway.Data;
using YarpGateway.Models;
namespace YarpGateway.Controllers;
[ApiController]
[Route("api/gateway/pending-services")]
public class PendingServicesController : ControllerBase
{
private readonly IDbContextFactory<GatewayDbContext> _dbContextFactory;
private readonly ILogger<PendingServicesController> _logger;
public PendingServicesController(
IDbContextFactory<GatewayDbContext> dbContextFactory,
ILogger<PendingServicesController> logger)
{
_dbContextFactory = dbContextFactory;
_logger = logger;
}
[HttpGet]
public async Task<IActionResult> GetPendingServices(
[FromQuery] int page = 1,
[FromQuery] int pageSize = 10,
[FromQuery] int? status = null)
{
await using var db = _dbContextFactory.CreateDbContext();
var query = db.PendingServiceDiscoveries.Where(p => !p.IsDeleted);
if (status.HasValue)
{
query = query.Where(p => p.Status == status.Value);
}
var total = await query.CountAsync();
var items = await query
.OrderByDescending(p => p.DiscoveredAt)
.Skip((page - 1) * pageSize)
.Take(pageSize)
.Select(p => new
{
p.Id,
p.K8sServiceName,
p.K8sNamespace,
p.K8sClusterIP,
DiscoveredPorts = System.Text.Json.JsonSerializer.Deserialize<List<int>>(p.DiscoveredPorts) ?? new List<int>(),
Labels = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(p.Labels) ?? new Dictionary<string, string>(),
p.PodCount,
Status = (PendingServiceStatus)p.Status,
p.AssignedClusterId,
p.AssignedBy,
p.AssignedAt,
p.DiscoveredAt
})
.ToListAsync();
return Ok(new { items, total, page, pageSize });
}
[HttpGet("{id}")]
public async Task<IActionResult> GetPendingService(long id)
{
await using var db = _dbContextFactory.CreateDbContext();
var service = await db.PendingServiceDiscoveries.FindAsync(id);
if (service == null || service.IsDeleted)
{
return NotFound(new { message = "Pending service not found" });
}
return Ok(new
{
service.Id,
service.K8sServiceName,
service.K8sNamespace,
service.K8sClusterIP,
DiscoveredPorts = System.Text.Json.JsonSerializer.Deserialize<List<int>>(service.DiscoveredPorts) ?? new List<int>(),
Labels = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(service.Labels) ?? new Dictionary<string, string>(),
service.PodCount,
Status = (PendingServiceStatus)service.Status,
service.AssignedClusterId,
service.AssignedBy,
service.AssignedAt,
service.DiscoveredAt
});
}
[HttpPost("{id}/assign")]
public async Task<IActionResult> AssignService(long id, [FromBody] AssignServiceRequest request)
{
await using var db = _dbContextFactory.CreateDbContext();
var pendingService = await db.PendingServiceDiscoveries.FindAsync(id);
if (pendingService == null || pendingService.IsDeleted)
{
return NotFound(new { message = "Pending service not found" });
}
if (pendingService.Status != (int)PendingServiceStatus.Pending)
{
return BadRequest(new { message = $"Service is already {((PendingServiceStatus)pendingService.Status)}, cannot assign" });
}
if (string.IsNullOrEmpty(request.ClusterId))
{
return BadRequest(new { message = "ClusterId is required" });
}
var existingCluster = await db.ServiceInstances
.AnyAsync(i => i.ClusterId == request.ClusterId && !i.IsDeleted);
if (!existingCluster)
{
return BadRequest(new { message = $"Cluster '{request.ClusterId}' does not exist. Please create the cluster first." });
}
var discoveredPorts = System.Text.Json.JsonSerializer.Deserialize<List<int>>(pendingService.DiscoveredPorts) ?? new List<int>();
var primaryPort = discoveredPorts.FirstOrDefault() > 0 ? discoveredPorts.First() : 80;
var instanceNumber = await db.ServiceInstances
.CountAsync(i => i.ClusterId == request.ClusterId && !i.IsDeleted);
var newInstance = new GwServiceInstance
{
ClusterId = request.ClusterId,
DestinationId = $"{pendingService.K8sServiceName}-{instanceNumber + 1}",
Address = $"http://{pendingService.K8sClusterIP}:{primaryPort}",
Health = 1,
Weight = 100,
Status = 1,
CreatedTime = DateTime.UtcNow,
Version = 1
};
db.ServiceInstances.Add(newInstance);
pendingService.Status = (int)PendingServiceStatus.Approved;
pendingService.AssignedClusterId = request.ClusterId;
pendingService.AssignedBy = "admin";
pendingService.AssignedAt = DateTime.UtcNow;
pendingService.Version++;
await db.SaveChangesAsync();
_logger.LogInformation("Service {ServiceName} assigned to cluster {ClusterId} by admin",
pendingService.K8sServiceName, request.ClusterId);
return Ok(new
{
success = true,
message = $"Service '{pendingService.K8sServiceName}' assigned to cluster '{request.ClusterId}'",
instanceId = newInstance.Id
});
}
[HttpPost("{id}/reject")]
public async Task<IActionResult> RejectService(long id)
{
await using var db = _dbContextFactory.CreateDbContext();
var pendingService = await db.PendingServiceDiscoveries.FindAsync(id);
if (pendingService == null || pendingService.IsDeleted)
{
return NotFound(new { message = "Pending service not found" });
}
if (pendingService.Status != (int)PendingServiceStatus.Pending)
{
return BadRequest(new { message = $"Service is already {((PendingServiceStatus)pendingService.Status)}, cannot reject" });
}
pendingService.Status = (int)PendingServiceStatus.Rejected;
pendingService.AssignedBy = "admin";
pendingService.AssignedAt = DateTime.UtcNow;
pendingService.Version++;
await db.SaveChangesAsync();
_logger.LogInformation("Service {ServiceName} rejected by admin", pendingService.K8sServiceName);
return Ok(new { success = true, message = $"Service '{pendingService.K8sServiceName}' rejected" });
}
[HttpGet("clusters")]
public async Task<IActionResult> GetClusters()
{
await using var db = _dbContextFactory.CreateDbContext();
var clusters = await db.ServiceInstances
.Where(i => !i.IsDeleted)
.GroupBy(i => i.ClusterId)
.Select(g => new
{
ClusterId = g.Key,
InstanceCount = g.Count(),
HealthyCount = g.Count(i => i.Health == 1)
})
.ToListAsync();
return Ok(clusters);
}
}
public class AssignServiceRequest
{
public string ClusterId { get; set; } = string.Empty;
}

View File

@ -1,6 +1,4 @@
using Microsoft.EntityFrameworkCore;
using Npgsql;
using YarpGateway.Config;
using YarpGateway.Models;
namespace YarpGateway.Data;
@ -15,78 +13,6 @@ public class GatewayDbContext : DbContext
public DbSet<GwTenant> Tenants => Set<GwTenant>();
public DbSet<GwTenantRoute> TenantRoutes => Set<GwTenantRoute>();
public DbSet<GwServiceInstance> ServiceInstances => Set<GwServiceInstance>();
public DbSet<GwPendingServiceDiscovery> PendingServiceDiscoveries => Set<GwPendingServiceDiscovery>();
public override int SaveChanges(bool acceptAllChangesOnSuccess)
{
DetectConfigChanges();
var result = base.SaveChanges(acceptAllChangesOnSuccess);
if (_configChangeDetected)
{
NotifyConfigChangedSync();
}
return result;
}
public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)
{
DetectConfigChanges();
var result = await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
if (_configChangeDetected)
{
await NotifyConfigChangedAsync(cancellationToken);
}
return result;
}
private bool _configChangeDetected;
private void DetectConfigChanges()
{
var entries = ChangeTracker.Entries()
.Where(e => e.State is EntityState.Added or EntityState.Modified or EntityState.Deleted)
.Where(e => e.Entity is GwTenantRoute or GwServiceInstance or GwTenant);
_configChangeDetected = entries.Any();
}
private bool IsRelationalDatabase()
{
try
{
return Database.IsRelational();
}
catch
{
return false;
}
}
private void NotifyConfigChangedSync()
{
if (!IsRelationalDatabase()) return;
var connectionString = Database.GetConnectionString();
if (string.IsNullOrEmpty(connectionString)) return;
using var connection = new NpgsqlConnection(connectionString);
connection.Open();
using var cmd = new NpgsqlCommand($"NOTIFY {ConfigNotifyChannel.GatewayConfigChanged}", connection);
cmd.ExecuteNonQuery();
}
private async Task NotifyConfigChangedAsync(CancellationToken cancellationToken)
{
if (!IsRelationalDatabase()) return;
var connectionString = Database.GetConnectionString();
if (string.IsNullOrEmpty(connectionString)) return;
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync(cancellationToken);
await using var cmd = new NpgsqlCommand($"NOTIFY {ConfigNotifyChannel.GatewayConfigChanged}", connection);
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
@ -121,21 +47,6 @@ public class GatewayDbContext : DbContext
entity.HasIndex(e => e.Health);
});
modelBuilder.Entity<GwPendingServiceDiscovery>(entity =>
{
entity.HasKey(e => e.Id);
entity.Property(e => e.K8sServiceName).HasMaxLength(255).IsRequired();
entity.Property(e => e.K8sNamespace).HasMaxLength(255).IsRequired();
entity.Property(e => e.K8sClusterIP).HasMaxLength(50);
entity.Property(e => e.DiscoveredPorts).HasMaxLength(500);
entity.Property(e => e.Labels).HasMaxLength(2000);
entity.Property(e => e.AssignedClusterId).HasMaxLength(100);
entity.Property(e => e.AssignedBy).HasMaxLength(100);
entity.HasIndex(e => new { e.K8sServiceName, e.K8sNamespace, e.IsDeleted }).IsUnique();
entity.HasIndex(e => e.Status);
entity.HasIndex(e => e.DiscoveredAt);
});
base.OnModelCreating(modelBuilder);
}
}

View File

@ -1,6 +0,0 @@
<Project>
<PropertyGroup>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
</Project>

View File

@ -1,29 +1,14 @@
FROM mcr.microsoft.com/dotnet/aspnet:10.0 AS base
USER $APP_UID
WORKDIR /app
EXPOSE 8080
EXPOSE 8081
FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build
ARG BUILD_CONFIGURATION=Release
WORKDIR /src
# Copy Directory.Packages.props for centralized version management
COPY ["Directory.Packages.props", "./"]
COPY ["src/Directory.Packages.props", "src/"]
COPY ["src/YarpGateway/YarpGateway.csproj", "src/YarpGateway/"]
COPY ["src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Core/Fengling.ServiceDiscovery.Core.csproj", "src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Core/"]
COPY ["src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Kubernetes/Fengling.ServiceDiscovery.Kubernetes.csproj", "src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Kubernetes/"]
COPY ["src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Static/Fengling.ServiceDiscovery.Static.csproj", "src/Fengling.ServiceDiscovery/Fengling.ServiceDiscovery.Static/"]
RUN dotnet restore "src/YarpGateway/YarpGateway.csproj"
COPY . .
WORKDIR "/src/src/YarpGateway"
RUN dotnet build "./YarpGateway.csproj" -c $BUILD_CONFIGURATION -o /app/build
FROM build AS publish
ARG BUILD_CONFIGURATION=Release
RUN dotnet publish "./YarpGateway.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
RUN dotnet restore
RUN dotnet publish -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
COPY --from=build /app/publish .
ENTRYPOINT ["dotnet", "YarpGateway.dll"]

View File

@ -10,7 +10,6 @@ public class DynamicProxyConfigProvider : IProxyConfigProvider
private readonly DatabaseRouteConfigProvider _routeProvider;
private readonly DatabaseClusterConfigProvider _clusterProvider;
private readonly object _lock = new();
private CancellationTokenSource? _cts;
public DynamicProxyConfigProvider(
DatabaseRouteConfigProvider routeProvider,
@ -18,8 +17,6 @@ public class DynamicProxyConfigProvider : IProxyConfigProvider
{
_routeProvider = routeProvider;
_clusterProvider = clusterProvider;
_cts = new CancellationTokenSource();
_config = null!;
UpdateConfig();
}
@ -32,17 +29,13 @@ public class DynamicProxyConfigProvider : IProxyConfigProvider
{
lock (_lock)
{
_cts?.Cancel();
_cts = new CancellationTokenSource();
var routes = _routeProvider.GetRoutes();
var clusters = _clusterProvider.GetClusters();
_config = new InMemoryProxyConfig(
routes,
clusters,
Array.Empty<IReadOnlyDictionary<string, string>>(),
_cts.Token
Array.Empty<IReadOnlyDictionary<string, string>>()
);
}
}
@ -56,23 +49,21 @@ public class DynamicProxyConfigProvider : IProxyConfigProvider
private class InMemoryProxyConfig : IProxyConfig
{
private readonly CancellationChangeToken _changeToken;
private static readonly CancellationChangeToken _nullChangeToken = new(new CancellationToken());
public InMemoryProxyConfig(
IReadOnlyList<RouteConfig> routes,
IReadOnlyList<ClusterConfig> clusters,
IReadOnlyList<IReadOnlyDictionary<string, string>> transforms,
CancellationToken token)
IReadOnlyList<IReadOnlyDictionary<string, string>> transforms)
{
Routes = routes;
Clusters = clusters;
Transforms = transforms;
_changeToken = new CancellationChangeToken(token);
}
public IReadOnlyList<RouteConfig> Routes { get; }
public IReadOnlyList<ClusterConfig> Clusters { get; }
public IReadOnlyList<IReadOnlyDictionary<string, string>> Transforms { get; }
public IChangeToken ChangeToken => _changeToken;
public IChangeToken ChangeToken => _nullChangeToken;
}
}

View File

@ -1,275 +0,0 @@
// <auto-generated />
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
using YarpGateway.Data;
#nullable disable
namespace YarpGateway.Migrations
{
[DbContext(typeof(GatewayDbContext))]
[Migration("20260222134342_AddPendingServiceDiscovery")]
partial class AddPendingServiceDiscovery
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "10.0.2")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("YarpGateway.Models.GwPendingServiceDiscovery", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<DateTime?>("AssignedAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("AssignedBy")
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<string>("AssignedClusterId")
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<DateTime>("DiscoveredAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("DiscoveredPorts")
.IsRequired()
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<string>("K8sClusterIP")
.HasMaxLength(50)
.HasColumnType("character varying(50)");
b.Property<string>("K8sNamespace")
.IsRequired()
.HasMaxLength(255)
.HasColumnType("character varying(255)");
b.Property<string>("K8sServiceName")
.IsRequired()
.HasMaxLength(255)
.HasColumnType("character varying(255)");
b.Property<string>("Labels")
.IsRequired()
.HasMaxLength(2000)
.HasColumnType("character varying(2000)");
b.Property<int>("PodCount")
.HasColumnType("integer");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<int>("Version")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("DiscoveredAt");
b.HasIndex("Status");
b.HasIndex("K8sServiceName", "K8sNamespace", "IsDeleted")
.IsUnique();
b.ToTable("PendingServiceDiscoveries");
});
modelBuilder.Entity("YarpGateway.Models.GwServiceInstance", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<string>("Address")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<string>("ClusterId")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<DateTime>("CreatedTime")
.HasColumnType("timestamp with time zone");
b.Property<string>("DestinationId")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<int>("Health")
.HasColumnType("integer");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<long?>("UpdatedBy")
.HasColumnType("bigint");
b.Property<DateTime?>("UpdatedTime")
.HasColumnType("timestamp with time zone");
b.Property<int>("Version")
.HasColumnType("integer");
b.Property<int>("Weight")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("Health");
b.HasIndex("ClusterId", "DestinationId")
.IsUnique();
b.ToTable("ServiceInstances");
});
modelBuilder.Entity("YarpGateway.Models.GwTenant", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<DateTime>("CreatedTime")
.HasColumnType("timestamp with time zone");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<string>("TenantCode")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("character varying(50)");
b.Property<string>("TenantName")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<long?>("UpdatedBy")
.HasColumnType("bigint");
b.Property<DateTime?>("UpdatedTime")
.HasColumnType("timestamp with time zone");
b.Property<int>("Version")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("TenantCode")
.IsUnique();
b.ToTable("Tenants");
});
modelBuilder.Entity("YarpGateway.Models.GwTenantRoute", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<string>("ClusterId")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<long?>("CreatedBy")
.HasColumnType("bigint");
b.Property<DateTime>("CreatedTime")
.HasColumnType("timestamp with time zone");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<bool>("IsGlobal")
.HasColumnType("boolean");
b.Property<string>("PathPattern")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<int>("Priority")
.HasColumnType("integer");
b.Property<string>("ServiceName")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<string>("TenantCode")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("character varying(50)");
b.Property<long?>("UpdatedBy")
.HasColumnType("bigint");
b.Property<DateTime?>("UpdatedTime")
.HasColumnType("timestamp with time zone");
b.Property<int>("Version")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("ClusterId");
b.HasIndex("ServiceName");
b.HasIndex("TenantCode");
b.HasIndex("ServiceName", "IsGlobal", "Status");
b.ToTable("TenantRoutes");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -1,64 +0,0 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
#nullable disable
namespace YarpGateway.Migrations
{
/// <inheritdoc />
public partial class AddPendingServiceDiscovery : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "PendingServiceDiscoveries",
columns: table => new
{
Id = table.Column<long>(type: "bigint", nullable: false)
.Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
K8sServiceName = table.Column<string>(type: "character varying(255)", maxLength: 255, nullable: false),
K8sNamespace = table.Column<string>(type: "character varying(255)", maxLength: 255, nullable: false),
K8sClusterIP = table.Column<string>(type: "character varying(50)", maxLength: 50, nullable: true),
DiscoveredPorts = table.Column<string>(type: "character varying(500)", maxLength: 500, nullable: false),
Labels = table.Column<string>(type: "character varying(2000)", maxLength: 2000, nullable: false),
PodCount = table.Column<int>(type: "integer", nullable: false),
Status = table.Column<int>(type: "integer", nullable: false),
AssignedClusterId = table.Column<string>(type: "character varying(100)", maxLength: 100, nullable: true),
AssignedBy = table.Column<string>(type: "character varying(100)", maxLength: 100, nullable: true),
AssignedAt = table.Column<DateTime>(type: "timestamp with time zone", nullable: true),
DiscoveredAt = table.Column<DateTime>(type: "timestamp with time zone", nullable: false),
IsDeleted = table.Column<bool>(type: "boolean", nullable: false),
Version = table.Column<int>(type: "integer", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_PendingServiceDiscoveries", x => x.Id);
});
migrationBuilder.CreateIndex(
name: "IX_PendingServiceDiscoveries_DiscoveredAt",
table: "PendingServiceDiscoveries",
column: "DiscoveredAt");
migrationBuilder.CreateIndex(
name: "IX_PendingServiceDiscoveries_K8sServiceName_K8sNamespace_IsDel~",
table: "PendingServiceDiscoveries",
columns: new[] { "K8sServiceName", "K8sNamespace", "IsDeleted" },
unique: true);
migrationBuilder.CreateIndex(
name: "IX_PendingServiceDiscoveries_Status",
table: "PendingServiceDiscoveries",
column: "Status");
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "PendingServiceDiscoveries");
}
}
}

View File

@ -17,81 +17,11 @@ namespace YarpGateway.Migrations
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "10.0.2")
.HasAnnotation("ProductVersion", "9.0.0")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("YarpGateway.Models.GwPendingServiceDiscovery", b =>
{
b.Property<long>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("bigint");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<long>("Id"));
b.Property<DateTime?>("AssignedAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("AssignedBy")
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<string>("AssignedClusterId")
.HasMaxLength(100)
.HasColumnType("character varying(100)");
b.Property<DateTime>("DiscoveredAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("DiscoveredPorts")
.IsRequired()
.HasMaxLength(500)
.HasColumnType("character varying(500)");
b.Property<bool>("IsDeleted")
.HasColumnType("boolean");
b.Property<string>("K8sClusterIP")
.HasMaxLength(50)
.HasColumnType("character varying(50)");
b.Property<string>("K8sNamespace")
.IsRequired()
.HasMaxLength(255)
.HasColumnType("character varying(255)");
b.Property<string>("K8sServiceName")
.IsRequired()
.HasMaxLength(255)
.HasColumnType("character varying(255)");
b.Property<string>("Labels")
.IsRequired()
.HasMaxLength(2000)
.HasColumnType("character varying(2000)");
b.Property<int>("PodCount")
.HasColumnType("integer");
b.Property<int>("Status")
.HasColumnType("integer");
b.Property<int>("Version")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("DiscoveredAt");
b.HasIndex("Status");
b.HasIndex("K8sServiceName", "K8sNamespace", "IsDeleted")
.IsUnique();
b.ToTable("PendingServiceDiscoveries");
});
modelBuilder.Entity("YarpGateway.Models.GwServiceInstance", b =>
{
b.Property<long>("Id")

View File

@ -1,122 +0,0 @@
CREATE TABLE IF NOT EXISTS "__EFMigrationsHistory" (
"MigrationId" character varying(150) NOT NULL,
"ProductVersion" character varying(32) NOT NULL,
CONSTRAINT "PK___EFMigrationsHistory" PRIMARY KEY ("MigrationId")
);
START TRANSACTION;
CREATE TABLE "ServiceInstances" (
"Id" bigint GENERATED BY DEFAULT AS IDENTITY,
"ClusterId" character varying(100) NOT NULL,
"DestinationId" character varying(100) NOT NULL,
"Address" character varying(200) NOT NULL,
"Health" integer NOT NULL,
"Weight" integer NOT NULL,
"Status" integer NOT NULL,
"CreatedBy" bigint,
"CreatedTime" timestamp with time zone NOT NULL,
"UpdatedBy" bigint,
"UpdatedTime" timestamp with time zone,
"IsDeleted" boolean NOT NULL,
"Version" integer NOT NULL,
CONSTRAINT "PK_ServiceInstances" PRIMARY KEY ("Id")
);
CREATE TABLE "Tenants" (
"Id" bigint GENERATED BY DEFAULT AS IDENTITY,
"TenantCode" character varying(50) NOT NULL,
"TenantName" character varying(100) NOT NULL,
"Status" integer NOT NULL,
"CreatedBy" bigint,
"CreatedTime" timestamp with time zone NOT NULL,
"UpdatedBy" bigint,
"UpdatedTime" timestamp with time zone,
"IsDeleted" boolean NOT NULL,
"Version" integer NOT NULL,
CONSTRAINT "PK_Tenants" PRIMARY KEY ("Id"),
CONSTRAINT "AK_Tenants_TenantCode" UNIQUE ("TenantCode")
);
CREATE TABLE "TenantRoutes" (
"Id" bigint GENERATED BY DEFAULT AS IDENTITY,
"TenantCode" character varying(50) NOT NULL,
"ServiceName" character varying(100) NOT NULL,
"ClusterId" character varying(100) NOT NULL,
"PathPattern" character varying(200) NOT NULL,
"Priority" integer NOT NULL,
"Status" integer NOT NULL,
"CreatedBy" bigint,
"CreatedTime" timestamp with time zone NOT NULL,
"UpdatedBy" bigint,
"UpdatedTime" timestamp with time zone,
"IsDeleted" boolean NOT NULL,
"Version" integer NOT NULL,
CONSTRAINT "PK_TenantRoutes" PRIMARY KEY ("Id"),
CONSTRAINT "FK_TenantRoutes_Tenants_TenantCode" FOREIGN KEY ("TenantCode") REFERENCES "Tenants" ("TenantCode") ON DELETE RESTRICT
);
CREATE UNIQUE INDEX "IX_ServiceInstances_ClusterId_DestinationId" ON "ServiceInstances" ("ClusterId", "DestinationId");
CREATE INDEX "IX_ServiceInstances_Health" ON "ServiceInstances" ("Health");
CREATE INDEX "IX_TenantRoutes_ClusterId" ON "TenantRoutes" ("ClusterId");
CREATE UNIQUE INDEX "IX_TenantRoutes_TenantCode_ServiceName" ON "TenantRoutes" ("TenantCode", "ServiceName");
CREATE UNIQUE INDEX "IX_Tenants_TenantCode" ON "Tenants" ("TenantCode");
INSERT INTO "__EFMigrationsHistory" ("MigrationId", "ProductVersion")
VALUES ('20260201120312_InitialCreate', '10.0.2');
COMMIT;
START TRANSACTION;
ALTER TABLE "TenantRoutes" DROP CONSTRAINT "FK_TenantRoutes_Tenants_TenantCode";
ALTER TABLE "Tenants" DROP CONSTRAINT "AK_Tenants_TenantCode";
DROP INDEX "IX_TenantRoutes_TenantCode_ServiceName";
ALTER TABLE "TenantRoutes" ADD "IsGlobal" boolean NOT NULL DEFAULT FALSE;
CREATE INDEX "IX_TenantRoutes_ServiceName" ON "TenantRoutes" ("ServiceName");
CREATE INDEX "IX_TenantRoutes_ServiceName_IsGlobal_Status" ON "TenantRoutes" ("ServiceName", "IsGlobal", "Status");
CREATE INDEX "IX_TenantRoutes_TenantCode" ON "TenantRoutes" ("TenantCode");
INSERT INTO "__EFMigrationsHistory" ("MigrationId", "ProductVersion")
VALUES ('20260201133826_AddIsGlobalToTenantRoute', '10.0.2');
COMMIT;
START TRANSACTION;
CREATE TABLE "PendingServiceDiscoveries" (
"Id" bigint GENERATED BY DEFAULT AS IDENTITY,
"K8sServiceName" character varying(255) NOT NULL,
"K8sNamespace" character varying(255) NOT NULL,
"K8sClusterIP" character varying(50),
"DiscoveredPorts" character varying(500) NOT NULL,
"Labels" character varying(2000) NOT NULL,
"PodCount" integer NOT NULL,
"Status" integer NOT NULL,
"AssignedClusterId" character varying(100),
"AssignedBy" character varying(100),
"AssignedAt" timestamp with time zone,
"DiscoveredAt" timestamp with time zone NOT NULL,
"IsDeleted" boolean NOT NULL,
"Version" integer NOT NULL,
CONSTRAINT "PK_PendingServiceDiscoveries" PRIMARY KEY ("Id")
);
CREATE INDEX "IX_PendingServiceDiscoveries_DiscoveredAt" ON "PendingServiceDiscoveries" ("DiscoveredAt");
CREATE UNIQUE INDEX "IX_PendingServiceDiscoveries_K8sServiceName_K8sNamespace_IsDel~" ON "PendingServiceDiscoveries" ("K8sServiceName", "K8sNamespace", "IsDeleted");
CREATE INDEX "IX_PendingServiceDiscoveries_Status" ON "PendingServiceDiscoveries" ("Status");
INSERT INTO "__EFMigrationsHistory" ("MigrationId", "ProductVersion")
VALUES ('20260222134342_AddPendingServiceDiscovery', '10.0.2');
COMMIT;

View File

@ -1,27 +0,0 @@
namespace YarpGateway.Models;
public class GwPendingServiceDiscovery
{
public long Id { get; set; }
public string K8sServiceName { get; set; } = string.Empty;
public string K8sNamespace { get; set; } = string.Empty;
public string? K8sClusterIP { get; set; }
public string DiscoveredPorts { get; set; } = "[]";
public string Labels { get; set; } = "{}";
public int PodCount { get; set; } = 0;
public int Status { get; set; } = 0;
public string? AssignedClusterId { get; set; }
public string? AssignedBy { get; set; }
public DateTime? AssignedAt { get; set; }
public DateTime DiscoveredAt { get; set; } = DateTime.UtcNow;
public bool IsDeleted { get; set; } = false;
public int Version { get; set; } = 0;
}
public enum PendingServiceStatus
{
Pending = 0,
Approved = 1,
Rejected = 2,
K8sServiceNotFound = 3
}

View File

@ -10,8 +10,6 @@ using YarpGateway.LoadBalancing;
using YarpGateway.Middleware;
using YarpGateway.Services;
using StackExchange.Redis;
using Fengling.ServiceDiscovery.Extensions;
using Fengling.ServiceDiscovery.Kubernetes.Extensions;
var builder = WebApplication.CreateBuilder(args);
@ -64,20 +62,6 @@ builder.Services.AddSingleton<ILoadBalancingPolicy, DistributedWeightedRoundRobi
builder.Services.AddSingleton<DynamicProxyConfigProvider>();
builder.Services.AddSingleton<IProxyConfigProvider>(sp => sp.GetRequiredService<DynamicProxyConfigProvider>());
builder.Services.AddHostedService<PgSqlConfigChangeListener>();
// 添加 Kubernetes 服务发现
var useInClusterConfig = builder.Configuration.GetValue<bool>("ServiceDiscovery:UseInClusterConfig", true);
builder.Services.AddKubernetesServiceDiscovery(options =>
{
options.LabelSelector = "app.kubernetes.io/managed-by=yarp";
options.UseInClusterConfig = useInClusterConfig;
});
builder.Services.AddServiceDiscovery();
builder.Services.AddHostedService<KubernetesPendingSyncService>();
var corsSettings = builder.Configuration.GetSection("Cors");
builder.Services.AddCors(options =>
{
@ -102,9 +86,6 @@ builder.Services.AddCors(options =>
});
builder.Services.AddControllers();
builder.Services.AddHttpForwarder();
builder.Services.AddRouting();
builder.Services.AddReverseProxy();
var app = builder.Build();
@ -112,8 +93,6 @@ app.UseCors("AllowFrontend");
app.UseMiddleware<JwtTransformMiddleware>();
app.UseMiddleware<TenantRoutingMiddleware>();
app.MapGet("/health", () => Results.Ok(new { status = "healthy", timestamp = DateTime.UtcNow }));
app.MapControllers();
app.MapReverseProxy();

View File

@ -1,161 +0,0 @@
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using YarpGateway.Data;
using YarpGateway.Models;
namespace YarpGateway.Services;
public class KubernetesPendingSyncService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<KubernetesPendingSyncService> _logger;
private readonly TimeSpan _syncInterval = TimeSpan.FromSeconds(30);
private readonly TimeSpan _staleThreshold = TimeSpan.FromHours(24);
public KubernetesPendingSyncService(
IServiceProvider serviceProvider,
ILogger<KubernetesPendingSyncService> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting K8s pending service sync background task");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await SyncPendingServicesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during K8s pending service sync");
}
await Task.Delay(_syncInterval, stoppingToken);
}
}
private async Task SyncPendingServicesAsync(CancellationToken ct)
{
using var scope = _serviceProvider.CreateScope();
var providers = scope.ServiceProvider.GetServices<Fengling.ServiceDiscovery.IServiceDiscoveryProvider>();
var k8sProvider = providers.FirstOrDefault(p => p.ProviderName == "Kubernetes");
if (k8sProvider == null)
{
_logger.LogWarning("No Kubernetes service discovery provider found");
return;
}
var dbContextFactory = scope.ServiceProvider.GetRequiredService<IDbContextFactory<GatewayDbContext>>();
var discoveredServices = await k8sProvider.GetServicesAsync(ct);
await using var db = await dbContextFactory.CreateDbContextAsync(ct);
var existingPending = await db.PendingServiceDiscoveries
.Where(p => !p.IsDeleted && p.Status == (int)PendingServiceStatus.Pending)
.ToListAsync(ct);
var existingDict = existingPending
.ToDictionary(p => $"{p.K8sServiceName}|{p.K8sNamespace}");
var discoveredSet = discoveredServices
.Select(s => $"{s.Name}|{s.Namespace}")
.ToHashSet();
var addedCount = 0;
var updatedCount = 0;
var cleanedCount = 0;
foreach (var item in existingDict)
{
var key = item.Key;
if (!discoveredSet.Contains(key))
{
var pending = item.Value;
if (DateTime.UtcNow - pending.DiscoveredAt > _staleThreshold)
{
pending.IsDeleted = true;
pending.Version++;
cleanedCount++;
_logger.LogInformation("Cleaned up stale pending service {ServiceName} in namespace {Namespace}",
pending.K8sServiceName, pending.K8sNamespace);
}
else
{
pending.Status = (int)PendingServiceStatus.K8sServiceNotFound;
pending.Version++;
_logger.LogInformation("Pending service {ServiceName} in namespace {Namespace} not found in K8s, marked as not found",
pending.K8sServiceName, pending.K8sNamespace);
}
}
}
if (discoveredServices.Count > 0)
{
var discoveredDict = discoveredServices.ToDictionary(
s => $"{s.Name}|{s.Namespace}",
s => s);
foreach (var item in discoveredDict)
{
var key = item.Key;
var service = item.Value;
if (existingDict.TryGetValue(key, out var existing))
{
if (existing.Status == (int)PendingServiceStatus.K8sServiceNotFound)
{
existing.Status = (int)PendingServiceStatus.Pending;
existing.Version++;
updatedCount++;
}
var portsJson = JsonSerializer.Serialize(service.Ports);
var labelsJson = JsonSerializer.Serialize(service.Labels);
if (existing.DiscoveredPorts != portsJson || existing.Labels != labelsJson)
{
existing.DiscoveredPorts = portsJson;
existing.Labels = labelsJson;
existing.K8sClusterIP = service.ClusterIP;
existing.PodCount = service.Ports.Count;
existing.Version++;
updatedCount++;
}
}
else
{
var newPending = new GwPendingServiceDiscovery
{
K8sServiceName = service.Name,
K8sNamespace = service.Namespace,
K8sClusterIP = service.ClusterIP,
DiscoveredPorts = JsonSerializer.Serialize(service.Ports),
Labels = JsonSerializer.Serialize(service.Labels),
PodCount = service.Ports.Count,
Status = (int)PendingServiceStatus.Pending,
DiscoveredAt = DateTime.UtcNow,
Version = 1
};
db.PendingServiceDiscoveries.Add(newPending);
addedCount++;
}
}
}
if (addedCount > 0 || updatedCount > 0 || cleanedCount > 0)
{
await db.SaveChangesAsync(ct);
_logger.LogInformation("K8s sync completed: {Added} new, {Updated} updated, {Cleaned} cleaned",
addedCount, updatedCount, cleanedCount);
}
}
}

View File

@ -1,222 +0,0 @@
using System.Threading.Channels;
using Microsoft.EntityFrameworkCore;
using Npgsql;
using YarpGateway.Config;
using YarpGateway.Data;
using YarpGateway.DynamicProxy;
namespace YarpGateway.Services;
public class PgSqlConfigChangeListener : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<PgSqlConfigChangeListener> _logger;
private readonly TimeSpan _fallbackInterval = TimeSpan.FromMinutes(5);
private readonly string _connectionString;
private NpgsqlConnection? _connection;
private int _lastRouteVersion;
private int _lastClusterVersion;
private readonly Channel<bool> _reloadChannel = Channel.CreateBounded<bool>(1);
public PgSqlConfigChangeListener(
IServiceProvider serviceProvider,
ILogger<PgSqlConfigChangeListener> logger,
IConfiguration configuration)
{
_serviceProvider = serviceProvider;
_logger = logger;
_connectionString = configuration.GetConnectionString("DefaultConnection")
?? throw new InvalidOperationException("DefaultConnection is not configured");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting PgSql config change listener");
await InitializeListenerAsync(stoppingToken);
_ = FallbackPollingAsync(stoppingToken);
await ListenAsync(stoppingToken);
}
private async Task InitializeListenerAsync(CancellationToken stoppingToken)
{
try
{
_connection = new NpgsqlConnection(_connectionString);
_connection.Notification += OnNotification;
await _connection.OpenAsync(stoppingToken);
await using var cmd = _connection.CreateCommand();
cmd.CommandText = $"LISTEN {ConfigNotifyChannel.GatewayConfigChanged}";
await cmd.ExecuteNonQueryAsync(stoppingToken);
_logger.LogInformation("Listening on {Channel}", ConfigNotifyChannel.GatewayConfigChanged);
await UpdateVersionAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to initialize PgSql listener");
}
}
private void OnNotification(object sender, NpgsqlNotificationEventArgs e)
{
_logger.LogInformation("Received config change notification: {Payload}", e.Payload);
_ = _reloadChannel.Writer.WriteAsync(true);
}
private async Task ListenAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
if (_connection == null || _connection.State != System.Data.ConnectionState.Open)
{
await ReconnectAsync(stoppingToken);
}
await Task.Delay(Timeout.Infinite, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in listener loop, reconnecting...");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
await ReconnectAsync(stoppingToken);
}
}
}
private async Task ReconnectAsync(CancellationToken stoppingToken)
{
try
{
_connection?.Dispose();
_connection = new NpgsqlConnection(_connectionString);
_connection.Notification += OnNotification;
await _connection.OpenAsync(stoppingToken);
await using var cmd = _connection.CreateCommand();
cmd.CommandText = $"LISTEN {ConfigNotifyChannel.GatewayConfigChanged}";
await cmd.ExecuteNonQueryAsync(stoppingToken);
_logger.LogInformation("Reconnected to PostgreSQL and listening");
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to reconnect");
}
}
private async Task FallbackPollingAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(_fallbackInterval, stoppingToken);
await CheckAndReloadAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in fallback polling");
}
}
}
private async Task CheckAndReloadAsync(CancellationToken stoppingToken)
{
try
{
await using var scope = _serviceProvider.CreateAsyncScope();
await using var db = scope.ServiceProvider.GetRequiredService<GatewayDbContext>();
var currentRouteVersion = await db.TenantRoutes
.OrderByDescending(r => r.Version)
.Select(r => r.Version)
.FirstOrDefaultAsync(stoppingToken);
var currentClusterVersion = await db.ServiceInstances
.OrderByDescending(i => i.Version)
.Select(i => i.Version)
.FirstOrDefaultAsync(stoppingToken);
if (currentRouteVersion != _lastRouteVersion || currentClusterVersion != _lastClusterVersion)
{
_logger.LogInformation("Version change detected via fallback: route {RouteVer}->{NewRouteVer}, cluster {ClusterVer}->{NewClusterVer}",
_lastRouteVersion, currentRouteVersion, _lastClusterVersion, currentClusterVersion);
await ReloadConfigAsync(stoppingToken);
_lastRouteVersion = currentRouteVersion;
_lastClusterVersion = currentClusterVersion;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking version for fallback");
}
}
private async Task UpdateVersionAsync(CancellationToken stoppingToken)
{
try
{
await using var scope = _serviceProvider.CreateAsyncScope();
await using var db = scope.ServiceProvider.GetRequiredService<GatewayDbContext>();
_lastRouteVersion = await db.TenantRoutes
.OrderByDescending(r => r.Version)
.Select(r => r.Version)
.FirstOrDefaultAsync(stoppingToken);
_lastClusterVersion = await db.ServiceInstances
.OrderByDescending(i => i.Version)
.Select(i => i.Version)
.FirstOrDefaultAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating initial version");
}
}
private async Task ReloadConfigAsync(CancellationToken stoppingToken)
{
try
{
await using var scope = _serviceProvider.CreateAsyncScope();
var routeCache = scope.ServiceProvider.GetRequiredService<IRouteCache>();
await routeCache.ReloadAsync();
var configProvider = scope.ServiceProvider.GetRequiredService<DynamicProxyConfigProvider>();
configProvider.UpdateConfig();
_logger.LogInformation("Configuration reloaded successfully");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error reloading configuration");
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping PgSql config change listener");
_reloadChannel.Writer.Complete();
_connection?.Dispose();
await base.StopAsync(cancellationToken);
}
}

View File

@ -4,33 +4,20 @@
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design">
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="10.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="10.0.2">
<IncludeAssets>runtime; build; native; contentfiles, analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" />
<PackageReference Include="Serilog.AspNetCore" />
<PackageReference Include="Serilog.Sinks.Console" />
<PackageReference Include="Serilog.Sinks.File" />
<PackageReference Include="StackExchange.Redis" />
<PackageReference Include="Yarp.ReverseProxy" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Fengling.ServiceDiscovery\Fengling.ServiceDiscovery.Core\Fengling.ServiceDiscovery.Core.csproj" />
<ProjectReference Include="..\Fengling.ServiceDiscovery\Fengling.ServiceDiscovery.Kubernetes\Fengling.ServiceDiscovery.Kubernetes.csproj" />
<ProjectReference Include="..\Fengling.ServiceDiscovery\Fengling.ServiceDiscovery.Static\Fengling.ServiceDiscovery.Static.csproj" />
</ItemGroup>
<ItemGroup>
<Content Include="..\..\.dockerignore">
<Link>.dockerignore</Link>
</Content>
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="10.0.0" />
<PackageReference Include="Serilog.AspNetCore" Version="9.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
<PackageReference Include="Serilog.Sinks.File" Version="6.0.0" />
<PackageReference Include="StackExchange.Redis" Version="2.8.24" />
<PackageReference Include="Yarp.ReverseProxy" Version="2.2.0" />
</ItemGroup>
</Project>

View File

@ -16,7 +16,7 @@
"AllowAnyOrigin": false
},
"ConnectionStrings": {
"DefaultConnection": "Host=81.68.223.70;Port=15432;Database=fengling_gateway;Username=movingsam;Password=sl52788542"
"DefaultConnection": "Host=192.168.100.10;Port=5432;Database=fengling_gateway;Username=movingsam;Password=sl52788542"
},
"Jwt": {
"Authority": "https://your-auth-server.com",
@ -25,7 +25,7 @@
"ValidateAudience": true
},
"Redis": {
"ConnectionString": "81.68.223.70:6379",
"ConnectionString": "192.168.100.10:6379",
"Database": 0,
"InstanceName": "YarpGateway"
},

BIN
bin/Debug/net10.0/Humanizer.dll Executable file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
bin/Debug/net10.0/Npgsql.dll Executable file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
bin/Debug/net10.0/Serilog.dll Executable file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
bin/Debug/net10.0/YarpGateway Executable file

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,20 @@
{
"runtimeOptions": {
"tfm": "net10.0",
"frameworks": [
{
"name": "Microsoft.NETCore.App",
"version": "10.0.0"
},
{
"name": "Microsoft.AspNetCore.App",
"version": "10.0.0"
}
],
"configProperties": {
"System.GC.Server": true,
"System.Reflection.NullabilityInfoContext.IsSupported": true,
"System.Runtime.Serialization.EnableUnsafeBinaryFormatterSerialization": false
}
}
}

View File

@ -0,0 +1 @@
{"Version":1,"ManifestType":"Build","Endpoints":[]}

View File

@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}

View File

@ -0,0 +1,64 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning",
"Yarp.ReverseProxy": "Information"
}
},
"AllowedHosts": "*",
"Cors": {
"AllowedOrigins": [
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://localhost:5174"
],
"AllowAnyOrigin": false
},
"ConnectionStrings": {
"DefaultConnection": "Host=192.168.100.10;Port=5432;Database=fengling_gateway;Username=movingsam;Password=sl52788542"
},
"Jwt": {
"Authority": "https://your-auth-server.com",
"Audience": "fengling-gateway",
"ValidateIssuer": true,
"ValidateAudience": true
},
"Redis": {
"ConnectionString": "192.168.100.10:6379",
"Database": 0,
"InstanceName": "YarpGateway"
},
"ReverseProxy": {
"Routes": {},
"Clusters": {}
},
"Serilog": {
"Using": ["Serilog.Sinks.Console", "Serilog.Sinks.File"],
"MinimumLevel": "Information",
"WriteTo": [
{
"Name": "Console",
"Args": {
"outputTemplate": "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}"
}
},
{
"Name": "File",
"Args": {
"path": "logs/gateway-.log",
"rollingInterval": "Day",
"outputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level:u3}] {Message:lj}{NewLine}{Exception}"
}
}
],
"Enrich": ["FromLogContext", "WithMachineName", "WithThreadId"]
},
"Kestrel": {
"Endpoints": {
"Http": {
"Url": "http://0.0.0.0:8080"
}
}
}
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More