using System.Text.Encodings.Web;
using System.Text.Json;
using MigrationTool.Models;
using Npgsql;
using YamlDotNet.Serialization;
using YamlDotNet.Serialization.NamingConventions;
namespace MigrationTool;
///
/// 迁移服务 - 处理从数据库读取配置并生成 K8s Service YAML
///
public class MigrationService
{
private readonly MigrationOptions _options;
private readonly ILogger _logger;
private readonly ISerializer _yamlSerializer;
public MigrationService(MigrationOptions options, ILogger logger)
{
_options = options;
_logger = logger;
_yamlSerializer = new SerializerBuilder()
.WithNamingConvention(CamelCaseNamingConvention.Instance)
.ConfigureDefaultValuesHandling(DefaultValuesHandling.OmitNull)
.Build();
}
///
/// 执行迁移
///
public async Task MigrateAsync(CancellationToken cancellationToken = default)
{
var report = new MigrationReport
{
StartTime = DateTime.UtcNow,
Entries = []
};
_logger.LogInformation("开始迁移任务...");
_logger.LogInformation($"连接字符串: {MaskConnectionString(_options.ConnectionString)}");
_logger.LogInformation($"输出目录: {Path.GetFullPath(_options.OutputDir)}");
_logger.LogInformation($"Dry-Run 模式: {_options.DryRun}");
_logger.LogInformation($"验证模式: {_options.Validate}");
// 确保输出目录存在(如果不是 dry-run)
if (!_options.DryRun)
{
Directory.CreateDirectory(_options.OutputDir);
_logger.LogInformation($"已创建输出目录: {Path.GetFullPath(_options.OutputDir)}");
}
try
{
// 从数据库读取配置
var routes = await LoadRoutesAsync(cancellationToken);
var clusters = await LoadClustersAsync(cancellationToken);
_logger.LogInformation($"从数据库加载了 {routes.Count} 条路由和 {clusters.Count} 个集群");
report.TotalRoutes = routes.Count;
// 验证数据完整性
if (_options.Validate)
{
var validationResults = ValidateData(routes, clusters);
if (validationResults.Any(r => !r.IsValid))
{
_logger.LogWarning($"数据验证发现 {validationResults.Count(r => !r.IsValid)} 个问题");
foreach (var result in validationResults.Where(r => !r.IsValid))
{
_logger.LogWarning($"验证失败: {result.Message}");
}
}
else
{
_logger.LogInformation("数据验证通过");
}
}
// 处理每条路由
foreach (var route in routes)
{
if (cancellationToken.IsCancellationRequested)
{
_logger.LogWarning("迁移任务已取消");
break;
}
var entry = await ProcessRouteAsync(route, clusters, cancellationToken);
report.Entries.Add(entry);
switch (entry.Status)
{
case MigrationStatus.Success:
report.SuccessCount++;
break;
case MigrationStatus.Failed:
report.FailedCount++;
break;
case MigrationStatus.Skipped:
report.SkippedCount++;
break;
}
}
}
catch (Exception ex)
{
_logger.LogError($"迁移过程中发生错误: {ex.Message}");
throw;
}
finally
{
report.EndTime = DateTime.UtcNow;
}
// 生成报告
if (_options.GenerateReport)
{
await SaveReportAsync(report, cancellationToken);
}
return report;
}
///
/// 从数据库加载路由配置
///
private async Task> LoadRoutesAsync(CancellationToken cancellationToken)
{
var routes = new List();
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = @"
SELECT id, tenant_code, service_name, cluster_id, path_pattern,
match, priority, status, is_global, is_deleted
FROM tenant_routes
WHERE is_deleted = false AND status = 1";
if (!string.IsNullOrEmpty(_options.TenantCode))
{
sql += " AND tenant_code = @tenantCode";
}
sql += " ORDER BY tenant_code, service_name";
await using var command = new NpgsqlCommand(sql, connection);
if (!string.IsNullOrEmpty(_options.TenantCode))
{
command.Parameters.AddWithValue("@tenantCode", _options.TenantCode);
}
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var route = new GwTenantRouteModel
{
Id = reader.GetInt64(0),
TenantCode = reader.GetString(1),
ServiceName = reader.GetString(2),
ClusterId = reader.GetString(3),
PathPattern = reader.GetString(4),
MatchJson = reader.IsDBNull(5) ? null : reader.GetString(5),
Priority = reader.GetInt32(6),
Status = reader.GetInt32(7),
IsGlobal = reader.GetBoolean(8),
IsDeleted = reader.GetBoolean(9)
};
routes.Add(route);
}
return routes;
}
///
/// 从数据库加载集群配置
///
private async Task> LoadClustersAsync(CancellationToken cancellationToken)
{
var clusters = new List();
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = @"
SELECT id, cluster_id, name, description, destinations, status, is_deleted
FROM clusters
WHERE is_deleted = false AND status = 1";
await using var command = new NpgsqlCommand(sql, connection);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var cluster = new GwClusterModel
{
Id = reader.GetString(0),
ClusterId = reader.GetString(1),
Name = reader.GetString(2),
Description = reader.IsDBNull(3) ? null : reader.GetString(3),
DestinationsJson = reader.IsDBNull(4) ? null : reader.GetString(4),
Status = reader.GetInt32(5),
IsDeleted = reader.GetBoolean(6)
};
clusters.Add(cluster);
}
return clusters;
}
///
/// 处理单条路由
///
private async Task ProcessRouteAsync(
GwTenantRouteModel route,
List clusters,
CancellationToken cancellationToken)
{
var entry = new MigrationEntry
{
ServiceName = route.ServiceName,
TenantCode = route.TenantCode,
ClusterId = route.ClusterId,
Status = MigrationStatus.Success
};
_logger.LogDebug($"处理路由: {route.ServiceName} (租户: {route.TenantCode})");
try
{
// 查找对应的集群
var cluster = clusters.FirstOrDefault(c => c.ClusterId == route.ClusterId);
if (cluster == null)
{
_logger.LogWarning($"路由 {route.ServiceName} 引用的集群 {route.ClusterId} 不存在,跳过");
entry.Status = MigrationStatus.Skipped;
entry.ErrorMessage = $"集群 {route.ClusterId} 不存在";
return entry;
}
// 获取目标端点
var destinations = cluster.GetDestinations();
var destination = destinations.FirstOrDefault(d => d.Status == 1)
?? destinations.FirstOrDefault();
if (destination == null)
{
_logger.LogWarning($"集群 {cluster.ClusterId} 没有可用的目标端点");
}
// 生成 Service YAML
var serviceYaml = GenerateServiceYaml(route, cluster, destination);
// 确定输出文件名
var fileName = $"{route.TenantCode}-{route.ServiceName}.yaml".ToLowerInvariant();
var outputPath = Path.Combine(_options.OutputDir, fileName);
entry.OutputPath = outputPath;
if (_options.DryRun)
{
_logger.LogInformation($"[DRY-RUN] 将生成: {fileName}");
_logger.LogDebug($"YAML 内容:\n{serviceYaml}");
}
else
{
await File.WriteAllTextAsync(outputPath, serviceYaml, cancellationToken);
_logger.LogInformation($"已生成: {outputPath}");
}
}
catch (Exception ex)
{
_logger.LogError($"处理路由 {route.ServiceName} 时出错: {ex.Message}");
entry.Status = MigrationStatus.Failed;
entry.ErrorMessage = ex.Message;
}
return entry;
}
///
/// 生成 K8s Service YAML
///
private string GenerateServiceYaml(
GwTenantRouteModel route,
GwClusterModel cluster,
GwDestinationModel? destination)
{
var serviceName = $"{route.TenantCode}-{route.ServiceName}".ToLowerInvariant();
var path = route.GetPath();
var host = route.GetHost() ?? _options.DefaultHost;
var destinationId = destination?.DestinationId ?? "default";
var model = new K8sServiceModel
{
Metadata = new K8sMetadata
{
Name = serviceName,
Labels = new Dictionary
{
["app-router-host"] = host,
["app-router-name"] = route.ServiceName,
["app-router-prefix"] = path,
["app-cluster-name"] = cluster.ClusterId,
["app-cluster-destination"] = destinationId,
["app-tenant"] = route.TenantCode,
["app-managed-by"] = "migration-tool"
},
Annotations = new Dictionary
{
["migration-tool/fengling.gateway/route-id"] = route.Id.ToString(),
["migration-tool/fengling.gateway/cluster-id"] = cluster.Id,
["migration-tool/fengling.gateway/priority"] = route.Priority.ToString(),
["migration-tool/timestamp"] = DateTime.UtcNow.ToString("O")
}
},
Spec = new K8sSpec
{
Type = "ClusterIP",
Selector = new Dictionary
{
["app"] = route.ServiceName.ToLowerInvariant()
},
Ports =
[
new K8sPort
{
Port = _options.ServicePort,
TargetPort = _options.TargetPort,
Name = "http",
Protocol = "TCP"
}
]
}
};
var yaml = _yamlSerializer.Serialize(model);
// 添加文档分隔符
return $"---\n{yaml}";
}
///
/// 验证数据完整性
///
private List ValidateData(
List routes,
List clusters)
{
var results = new List();
var clusterIds = clusters.Select(c => c.ClusterId).ToHashSet();
foreach (var route in routes)
{
// 检查必填字段
if (string.IsNullOrWhiteSpace(route.ServiceName))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = "ServiceName 不能为空"
});
}
if (string.IsNullOrWhiteSpace(route.TenantCode))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = "TenantCode 不能为空"
});
}
if (string.IsNullOrWhiteSpace(route.ClusterId))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = "ClusterId 不能为空"
});
}
// 检查集群引用
if (!string.IsNullOrWhiteSpace(route.ClusterId) && !clusterIds.Contains(route.ClusterId))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = $"引用的集群 '{route.ClusterId}' 不存在"
});
}
// 检查路径
var path = route.GetPath();
if (string.IsNullOrWhiteSpace(path))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = "Path 不能为空"
});
}
}
// 检查集群
foreach (var cluster in clusters)
{
if (string.IsNullOrWhiteSpace(cluster.ClusterId))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Cluster[{cluster.Id}]",
Message = "ClusterId 不能为空"
});
}
var destinations = cluster.GetDestinations();
if (destinations.Count == 0)
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Cluster[{cluster.ClusterId}]",
Message = "没有配置目标端点"
});
}
foreach (var dest in destinations)
{
if (string.IsNullOrWhiteSpace(dest.DestinationId))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Cluster[{cluster.ClusterId}]/Destination",
Message = "DestinationId 不能为空"
});
}
if (string.IsNullOrWhiteSpace(dest.Address))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Cluster[{cluster.ClusterId}]/Destination[{dest.DestinationId}]",
Message = "Address 不能为空"
});
}
}
}
return results;
}
///
/// 保存迁移报告
///
private async Task SaveReportAsync(MigrationReport report, CancellationToken cancellationToken)
{
var reportPath = _options.ReportPath ??
Path.Combine(_options.OutputDir, $"migration-report-{DateTime.UtcNow:yyyyMMdd-HHmmss}.json");
var options = new JsonSerializerOptions
{
WriteIndented = true,
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping
};
var json = JsonSerializer.Serialize(report, options);
if (_options.DryRun)
{
_logger.LogInformation($"[DRY-RUN] 将生成报告: {reportPath}");
_logger.LogDebug($"报告内容:\n{json}");
}
else
{
await File.WriteAllTextAsync(reportPath, json, cancellationToken);
_logger.LogInformation($"已生成报告: {reportPath}");
}
}
///
/// 掩盖连接字符串中的敏感信息
///
private static string MaskConnectionString(string connectionString)
{
if (string.IsNullOrEmpty(connectionString))
return "[empty]";
try
{
var builder = new NpgsqlConnectionStringBuilder(connectionString);
if (!string.IsNullOrEmpty(builder.Password))
{
builder.Password = "***";
}
return builder.ToString();
}
catch
{
return "[invalid connection string]";
}
}
}
///
/// 验证结果
///
public class ValidationResult
{
public bool IsValid { get; set; }
public string Entity { get; set; } = string.Empty;
public string Message { get; set; } = string.Empty;
}