using System.Text.Encodings.Web; using System.Text.Json; using MigrationTool.Models; using Npgsql; using YamlDotNet.Serialization; using YamlDotNet.Serialization.NamingConventions; namespace MigrationTool; /// /// 迁移服务 - 处理从数据库读取配置并生成 K8s Service YAML /// public class MigrationService { private readonly MigrationOptions _options; private readonly ILogger _logger; private readonly ISerializer _yamlSerializer; public MigrationService(MigrationOptions options, ILogger logger) { _options = options; _logger = logger; _yamlSerializer = new SerializerBuilder() .WithNamingConvention(CamelCaseNamingConvention.Instance) .ConfigureDefaultValuesHandling(DefaultValuesHandling.OmitNull) .Build(); } /// /// 执行迁移 /// public async Task MigrateAsync(CancellationToken cancellationToken = default) { var report = new MigrationReport { StartTime = DateTime.UtcNow, Entries = [] }; _logger.LogInformation("开始迁移任务..."); _logger.LogInformation($"连接字符串: {MaskConnectionString(_options.ConnectionString)}"); _logger.LogInformation($"输出目录: {Path.GetFullPath(_options.OutputDir)}"); _logger.LogInformation($"Dry-Run 模式: {_options.DryRun}"); _logger.LogInformation($"验证模式: {_options.Validate}"); // 确保输出目录存在(如果不是 dry-run) if (!_options.DryRun) { Directory.CreateDirectory(_options.OutputDir); _logger.LogInformation($"已创建输出目录: {Path.GetFullPath(_options.OutputDir)}"); } try { // 从数据库读取配置 var routes = await LoadRoutesAsync(cancellationToken); var clusters = await LoadClustersAsync(cancellationToken); _logger.LogInformation($"从数据库加载了 {routes.Count} 条路由和 {clusters.Count} 个集群"); report.TotalRoutes = routes.Count; // 验证数据完整性 if (_options.Validate) { var validationResults = ValidateData(routes, clusters); if (validationResults.Any(r => !r.IsValid)) { _logger.LogWarning($"数据验证发现 {validationResults.Count(r => !r.IsValid)} 个问题"); foreach (var result in validationResults.Where(r => !r.IsValid)) { _logger.LogWarning($"验证失败: {result.Message}"); } } else { _logger.LogInformation("数据验证通过"); } } // 处理每条路由 foreach (var route in routes) { if (cancellationToken.IsCancellationRequested) { _logger.LogWarning("迁移任务已取消"); break; } var entry = await ProcessRouteAsync(route, clusters, cancellationToken); report.Entries.Add(entry); switch (entry.Status) { case MigrationStatus.Success: report.SuccessCount++; break; case MigrationStatus.Failed: report.FailedCount++; break; case MigrationStatus.Skipped: report.SkippedCount++; break; } } } catch (Exception ex) { _logger.LogError($"迁移过程中发生错误: {ex.Message}"); throw; } finally { report.EndTime = DateTime.UtcNow; } // 生成报告 if (_options.GenerateReport) { await SaveReportAsync(report, cancellationToken); } return report; } /// /// 从数据库加载路由配置 /// private async Task> LoadRoutesAsync(CancellationToken cancellationToken) { var routes = new List(); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = @" SELECT id, tenant_code, service_name, cluster_id, path_pattern, match, priority, status, is_global, is_deleted FROM tenant_routes WHERE is_deleted = false AND status = 1"; if (!string.IsNullOrEmpty(_options.TenantCode)) { sql += " AND tenant_code = @tenantCode"; } sql += " ORDER BY tenant_code, service_name"; await using var command = new NpgsqlCommand(sql, connection); if (!string.IsNullOrEmpty(_options.TenantCode)) { command.Parameters.AddWithValue("@tenantCode", _options.TenantCode); } await using var reader = await command.ExecuteReaderAsync(cancellationToken); while (await reader.ReadAsync(cancellationToken)) { var route = new GwTenantRouteModel { Id = reader.GetInt64(0), TenantCode = reader.GetString(1), ServiceName = reader.GetString(2), ClusterId = reader.GetString(3), PathPattern = reader.GetString(4), MatchJson = reader.IsDBNull(5) ? null : reader.GetString(5), Priority = reader.GetInt32(6), Status = reader.GetInt32(7), IsGlobal = reader.GetBoolean(8), IsDeleted = reader.GetBoolean(9) }; routes.Add(route); } return routes; } /// /// 从数据库加载集群配置 /// private async Task> LoadClustersAsync(CancellationToken cancellationToken) { var clusters = new List(); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = @" SELECT id, cluster_id, name, description, destinations, status, is_deleted FROM clusters WHERE is_deleted = false AND status = 1"; await using var command = new NpgsqlCommand(sql, connection); await using var reader = await command.ExecuteReaderAsync(cancellationToken); while (await reader.ReadAsync(cancellationToken)) { var cluster = new GwClusterModel { Id = reader.GetString(0), ClusterId = reader.GetString(1), Name = reader.GetString(2), Description = reader.IsDBNull(3) ? null : reader.GetString(3), DestinationsJson = reader.IsDBNull(4) ? null : reader.GetString(4), Status = reader.GetInt32(5), IsDeleted = reader.GetBoolean(6) }; clusters.Add(cluster); } return clusters; } /// /// 处理单条路由 /// private async Task ProcessRouteAsync( GwTenantRouteModel route, List clusters, CancellationToken cancellationToken) { var entry = new MigrationEntry { ServiceName = route.ServiceName, TenantCode = route.TenantCode, ClusterId = route.ClusterId, Status = MigrationStatus.Success }; _logger.LogDebug($"处理路由: {route.ServiceName} (租户: {route.TenantCode})"); try { // 查找对应的集群 var cluster = clusters.FirstOrDefault(c => c.ClusterId == route.ClusterId); if (cluster == null) { _logger.LogWarning($"路由 {route.ServiceName} 引用的集群 {route.ClusterId} 不存在,跳过"); entry.Status = MigrationStatus.Skipped; entry.ErrorMessage = $"集群 {route.ClusterId} 不存在"; return entry; } // 获取目标端点 var destinations = cluster.GetDestinations(); var destination = destinations.FirstOrDefault(d => d.Status == 1) ?? destinations.FirstOrDefault(); if (destination == null) { _logger.LogWarning($"集群 {cluster.ClusterId} 没有可用的目标端点"); } // 生成 Service YAML var serviceYaml = GenerateServiceYaml(route, cluster, destination); // 确定输出文件名 var fileName = $"{route.TenantCode}-{route.ServiceName}.yaml".ToLowerInvariant(); var outputPath = Path.Combine(_options.OutputDir, fileName); entry.OutputPath = outputPath; if (_options.DryRun) { _logger.LogInformation($"[DRY-RUN] 将生成: {fileName}"); _logger.LogDebug($"YAML 内容:\n{serviceYaml}"); } else { await File.WriteAllTextAsync(outputPath, serviceYaml, cancellationToken); _logger.LogInformation($"已生成: {outputPath}"); } } catch (Exception ex) { _logger.LogError($"处理路由 {route.ServiceName} 时出错: {ex.Message}"); entry.Status = MigrationStatus.Failed; entry.ErrorMessage = ex.Message; } return entry; } /// /// 生成 K8s Service YAML /// private string GenerateServiceYaml( GwTenantRouteModel route, GwClusterModel cluster, GwDestinationModel? destination) { var serviceName = $"{route.TenantCode}-{route.ServiceName}".ToLowerInvariant(); var path = route.GetPath(); var host = route.GetHost() ?? _options.DefaultHost; var destinationId = destination?.DestinationId ?? "default"; var model = new K8sServiceModel { Metadata = new K8sMetadata { Name = serviceName, Labels = new Dictionary { ["app-router-host"] = host, ["app-router-name"] = route.ServiceName, ["app-router-prefix"] = path, ["app-cluster-name"] = cluster.ClusterId, ["app-cluster-destination"] = destinationId, ["app-tenant"] = route.TenantCode, ["app-managed-by"] = "migration-tool" }, Annotations = new Dictionary { ["migration-tool/fengling.gateway/route-id"] = route.Id.ToString(), ["migration-tool/fengling.gateway/cluster-id"] = cluster.Id, ["migration-tool/fengling.gateway/priority"] = route.Priority.ToString(), ["migration-tool/timestamp"] = DateTime.UtcNow.ToString("O") } }, Spec = new K8sSpec { Type = "ClusterIP", Selector = new Dictionary { ["app"] = route.ServiceName.ToLowerInvariant() }, Ports = [ new K8sPort { Port = _options.ServicePort, TargetPort = _options.TargetPort, Name = "http", Protocol = "TCP" } ] } }; var yaml = _yamlSerializer.Serialize(model); // 添加文档分隔符 return $"---\n{yaml}"; } /// /// 验证数据完整性 /// private List ValidateData( List routes, List clusters) { var results = new List(); var clusterIds = clusters.Select(c => c.ClusterId).ToHashSet(); foreach (var route in routes) { // 检查必填字段 if (string.IsNullOrWhiteSpace(route.ServiceName)) { results.Add(new ValidationResult { IsValid = false, Entity = $"Route[{route.Id}]", Message = "ServiceName 不能为空" }); } if (string.IsNullOrWhiteSpace(route.TenantCode)) { results.Add(new ValidationResult { IsValid = false, Entity = $"Route[{route.Id}]", Message = "TenantCode 不能为空" }); } if (string.IsNullOrWhiteSpace(route.ClusterId)) { results.Add(new ValidationResult { IsValid = false, Entity = $"Route[{route.Id}]", Message = "ClusterId 不能为空" }); } // 检查集群引用 if (!string.IsNullOrWhiteSpace(route.ClusterId) && !clusterIds.Contains(route.ClusterId)) { results.Add(new ValidationResult { IsValid = false, Entity = $"Route[{route.Id}]", Message = $"引用的集群 '{route.ClusterId}' 不存在" }); } // 检查路径 var path = route.GetPath(); if (string.IsNullOrWhiteSpace(path)) { results.Add(new ValidationResult { IsValid = false, Entity = $"Route[{route.Id}]", Message = "Path 不能为空" }); } } // 检查集群 foreach (var cluster in clusters) { if (string.IsNullOrWhiteSpace(cluster.ClusterId)) { results.Add(new ValidationResult { IsValid = false, Entity = $"Cluster[{cluster.Id}]", Message = "ClusterId 不能为空" }); } var destinations = cluster.GetDestinations(); if (destinations.Count == 0) { results.Add(new ValidationResult { IsValid = false, Entity = $"Cluster[{cluster.ClusterId}]", Message = "没有配置目标端点" }); } foreach (var dest in destinations) { if (string.IsNullOrWhiteSpace(dest.DestinationId)) { results.Add(new ValidationResult { IsValid = false, Entity = $"Cluster[{cluster.ClusterId}]/Destination", Message = "DestinationId 不能为空" }); } if (string.IsNullOrWhiteSpace(dest.Address)) { results.Add(new ValidationResult { IsValid = false, Entity = $"Cluster[{cluster.ClusterId}]/Destination[{dest.DestinationId}]", Message = "Address 不能为空" }); } } } return results; } /// /// 保存迁移报告 /// private async Task SaveReportAsync(MigrationReport report, CancellationToken cancellationToken) { var reportPath = _options.ReportPath ?? Path.Combine(_options.OutputDir, $"migration-report-{DateTime.UtcNow:yyyyMMdd-HHmmss}.json"); var options = new JsonSerializerOptions { WriteIndented = true, Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping }; var json = JsonSerializer.Serialize(report, options); if (_options.DryRun) { _logger.LogInformation($"[DRY-RUN] 将生成报告: {reportPath}"); _logger.LogDebug($"报告内容:\n{json}"); } else { await File.WriteAllTextAsync(reportPath, json, cancellationToken); _logger.LogInformation($"已生成报告: {reportPath}"); } } /// /// 掩盖连接字符串中的敏感信息 /// private static string MaskConnectionString(string connectionString) { if (string.IsNullOrEmpty(connectionString)) return "[empty]"; try { var builder = new NpgsqlConnectionStringBuilder(connectionString); if (!string.IsNullOrEmpty(builder.Password)) { builder.Password = "***"; } return builder.ToString(); } catch { return "[invalid connection string]"; } } } /// /// 验证结果 /// public class ValidationResult { public bool IsValid { get; set; } public string Entity { get; set; } = string.Empty; public string Message { get; set; } = string.Empty; }