fengling-gateway/tools/MigrationTool/MigrationService.cs
movingsam 52eba07097 feat: add MigrationTool for gateway config migration (IMPL-7)
- Create MigrationTool console app for exporting DB config to K8s YAML
- Support dry-run mode and validation
- Add Npgsql and YamlDotNet dependencies
2026-03-08 00:35:04 +08:00

522 lines
17 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Text.Encodings.Web;
using System.Text.Json;
using MigrationTool.Models;
using Npgsql;
using YamlDotNet.Serialization;
using YamlDotNet.Serialization.NamingConventions;
namespace MigrationTool;
/// <summary>
/// 迁移服务 - 处理从数据库读取配置并生成 K8s Service YAML
/// </summary>
public class MigrationService
{
private readonly MigrationOptions _options;
private readonly ILogger _logger;
private readonly ISerializer _yamlSerializer;
public MigrationService(MigrationOptions options, ILogger logger)
{
_options = options;
_logger = logger;
_yamlSerializer = new SerializerBuilder()
.WithNamingConvention(CamelCaseNamingConvention.Instance)
.ConfigureDefaultValuesHandling(DefaultValuesHandling.OmitNull)
.Build();
}
/// <summary>
/// 执行迁移
/// </summary>
public async Task<MigrationReport> MigrateAsync(CancellationToken cancellationToken = default)
{
var report = new MigrationReport
{
StartTime = DateTime.UtcNow,
Entries = []
};
_logger.LogInformation("开始迁移任务...");
_logger.LogInformation($"连接字符串: {MaskConnectionString(_options.ConnectionString)}");
_logger.LogInformation($"输出目录: {Path.GetFullPath(_options.OutputDir)}");
_logger.LogInformation($"Dry-Run 模式: {_options.DryRun}");
_logger.LogInformation($"验证模式: {_options.Validate}");
// 确保输出目录存在(如果不是 dry-run
if (!_options.DryRun)
{
Directory.CreateDirectory(_options.OutputDir);
_logger.LogInformation($"已创建输出目录: {Path.GetFullPath(_options.OutputDir)}");
}
try
{
// 从数据库读取配置
var routes = await LoadRoutesAsync(cancellationToken);
var clusters = await LoadClustersAsync(cancellationToken);
_logger.LogInformation($"从数据库加载了 {routes.Count} 条路由和 {clusters.Count} 个集群");
report.TotalRoutes = routes.Count;
// 验证数据完整性
if (_options.Validate)
{
var validationResults = ValidateData(routes, clusters);
if (validationResults.Any(r => !r.IsValid))
{
_logger.LogWarning($"数据验证发现 {validationResults.Count(r => !r.IsValid)} 个问题");
foreach (var result in validationResults.Where(r => !r.IsValid))
{
_logger.LogWarning($"验证失败: {result.Message}");
}
}
else
{
_logger.LogInformation("数据验证通过");
}
}
// 处理每条路由
foreach (var route in routes)
{
if (cancellationToken.IsCancellationRequested)
{
_logger.LogWarning("迁移任务已取消");
break;
}
var entry = await ProcessRouteAsync(route, clusters, cancellationToken);
report.Entries.Add(entry);
switch (entry.Status)
{
case MigrationStatus.Success:
report.SuccessCount++;
break;
case MigrationStatus.Failed:
report.FailedCount++;
break;
case MigrationStatus.Skipped:
report.SkippedCount++;
break;
}
}
}
catch (Exception ex)
{
_logger.LogError($"迁移过程中发生错误: {ex.Message}");
throw;
}
finally
{
report.EndTime = DateTime.UtcNow;
}
// 生成报告
if (_options.GenerateReport)
{
await SaveReportAsync(report, cancellationToken);
}
return report;
}
/// <summary>
/// 从数据库加载路由配置
/// </summary>
private async Task<List<GwTenantRouteModel>> LoadRoutesAsync(CancellationToken cancellationToken)
{
var routes = new List<GwTenantRouteModel>();
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = @"
SELECT id, tenant_code, service_name, cluster_id, path_pattern,
match, priority, status, is_global, is_deleted
FROM tenant_routes
WHERE is_deleted = false AND status = 1";
if (!string.IsNullOrEmpty(_options.TenantCode))
{
sql += " AND tenant_code = @tenantCode";
}
sql += " ORDER BY tenant_code, service_name";
await using var command = new NpgsqlCommand(sql, connection);
if (!string.IsNullOrEmpty(_options.TenantCode))
{
command.Parameters.AddWithValue("@tenantCode", _options.TenantCode);
}
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var route = new GwTenantRouteModel
{
Id = reader.GetInt64(0),
TenantCode = reader.GetString(1),
ServiceName = reader.GetString(2),
ClusterId = reader.GetString(3),
PathPattern = reader.GetString(4),
MatchJson = reader.IsDBNull(5) ? null : reader.GetString(5),
Priority = reader.GetInt32(6),
Status = reader.GetInt32(7),
IsGlobal = reader.GetBoolean(8),
IsDeleted = reader.GetBoolean(9)
};
routes.Add(route);
}
return routes;
}
/// <summary>
/// 从数据库加载集群配置
/// </summary>
private async Task<List<GwClusterModel>> LoadClustersAsync(CancellationToken cancellationToken)
{
var clusters = new List<GwClusterModel>();
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = @"
SELECT id, cluster_id, name, description, destinations, status, is_deleted
FROM clusters
WHERE is_deleted = false AND status = 1";
await using var command = new NpgsqlCommand(sql, connection);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var cluster = new GwClusterModel
{
Id = reader.GetString(0),
ClusterId = reader.GetString(1),
Name = reader.GetString(2),
Description = reader.IsDBNull(3) ? null : reader.GetString(3),
DestinationsJson = reader.IsDBNull(4) ? null : reader.GetString(4),
Status = reader.GetInt32(5),
IsDeleted = reader.GetBoolean(6)
};
clusters.Add(cluster);
}
return clusters;
}
/// <summary>
/// 处理单条路由
/// </summary>
private async Task<MigrationEntry> ProcessRouteAsync(
GwTenantRouteModel route,
List<GwClusterModel> clusters,
CancellationToken cancellationToken)
{
var entry = new MigrationEntry
{
ServiceName = route.ServiceName,
TenantCode = route.TenantCode,
ClusterId = route.ClusterId,
Status = MigrationStatus.Success
};
_logger.LogDebug($"处理路由: {route.ServiceName} (租户: {route.TenantCode})");
try
{
// 查找对应的集群
var cluster = clusters.FirstOrDefault(c => c.ClusterId == route.ClusterId);
if (cluster == null)
{
_logger.LogWarning($"路由 {route.ServiceName} 引用的集群 {route.ClusterId} 不存在,跳过");
entry.Status = MigrationStatus.Skipped;
entry.ErrorMessage = $"集群 {route.ClusterId} 不存在";
return entry;
}
// 获取目标端点
var destinations = cluster.GetDestinations();
var destination = destinations.FirstOrDefault(d => d.Status == 1)
?? destinations.FirstOrDefault();
if (destination == null)
{
_logger.LogWarning($"集群 {cluster.ClusterId} 没有可用的目标端点");
}
// 生成 Service YAML
var serviceYaml = GenerateServiceYaml(route, cluster, destination);
// 确定输出文件名
var fileName = $"{route.TenantCode}-{route.ServiceName}.yaml".ToLowerInvariant();
var outputPath = Path.Combine(_options.OutputDir, fileName);
entry.OutputPath = outputPath;
if (_options.DryRun)
{
_logger.LogInformation($"[DRY-RUN] 将生成: {fileName}");
_logger.LogDebug($"YAML 内容:\n{serviceYaml}");
}
else
{
await File.WriteAllTextAsync(outputPath, serviceYaml, cancellationToken);
_logger.LogInformation($"已生成: {outputPath}");
}
}
catch (Exception ex)
{
_logger.LogError($"处理路由 {route.ServiceName} 时出错: {ex.Message}");
entry.Status = MigrationStatus.Failed;
entry.ErrorMessage = ex.Message;
}
return entry;
}
/// <summary>
/// 生成 K8s Service YAML
/// </summary>
private string GenerateServiceYaml(
GwTenantRouteModel route,
GwClusterModel cluster,
GwDestinationModel? destination)
{
var serviceName = $"{route.TenantCode}-{route.ServiceName}".ToLowerInvariant();
var path = route.GetPath();
var host = route.GetHost() ?? _options.DefaultHost;
var destinationId = destination?.DestinationId ?? "default";
var model = new K8sServiceModel
{
Metadata = new K8sMetadata
{
Name = serviceName,
Labels = new Dictionary<string, string>
{
["app-router-host"] = host,
["app-router-name"] = route.ServiceName,
["app-router-prefix"] = path,
["app-cluster-name"] = cluster.ClusterId,
["app-cluster-destination"] = destinationId,
["app-tenant"] = route.TenantCode,
["app-managed-by"] = "migration-tool"
},
Annotations = new Dictionary<string, string>
{
["migration-tool/fengling.gateway/route-id"] = route.Id.ToString(),
["migration-tool/fengling.gateway/cluster-id"] = cluster.Id,
["migration-tool/fengling.gateway/priority"] = route.Priority.ToString(),
["migration-tool/timestamp"] = DateTime.UtcNow.ToString("O")
}
},
Spec = new K8sSpec
{
Type = "ClusterIP",
Selector = new Dictionary<string, string>
{
["app"] = route.ServiceName.ToLowerInvariant()
},
Ports =
[
new K8sPort
{
Port = _options.ServicePort,
TargetPort = _options.TargetPort,
Name = "http",
Protocol = "TCP"
}
]
}
};
var yaml = _yamlSerializer.Serialize(model);
// 添加文档分隔符
return $"---\n{yaml}";
}
/// <summary>
/// 验证数据完整性
/// </summary>
private List<ValidationResult> ValidateData(
List<GwTenantRouteModel> routes,
List<GwClusterModel> clusters)
{
var results = new List<ValidationResult>();
var clusterIds = clusters.Select(c => c.ClusterId).ToHashSet();
foreach (var route in routes)
{
// 检查必填字段
if (string.IsNullOrWhiteSpace(route.ServiceName))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = "ServiceName 不能为空"
});
}
if (string.IsNullOrWhiteSpace(route.TenantCode))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = "TenantCode 不能为空"
});
}
if (string.IsNullOrWhiteSpace(route.ClusterId))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = "ClusterId 不能为空"
});
}
// 检查集群引用
if (!string.IsNullOrWhiteSpace(route.ClusterId) && !clusterIds.Contains(route.ClusterId))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = $"引用的集群 '{route.ClusterId}' 不存在"
});
}
// 检查路径
var path = route.GetPath();
if (string.IsNullOrWhiteSpace(path))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Route[{route.Id}]",
Message = "Path 不能为空"
});
}
}
// 检查集群
foreach (var cluster in clusters)
{
if (string.IsNullOrWhiteSpace(cluster.ClusterId))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Cluster[{cluster.Id}]",
Message = "ClusterId 不能为空"
});
}
var destinations = cluster.GetDestinations();
if (destinations.Count == 0)
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Cluster[{cluster.ClusterId}]",
Message = "没有配置目标端点"
});
}
foreach (var dest in destinations)
{
if (string.IsNullOrWhiteSpace(dest.DestinationId))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Cluster[{cluster.ClusterId}]/Destination",
Message = "DestinationId 不能为空"
});
}
if (string.IsNullOrWhiteSpace(dest.Address))
{
results.Add(new ValidationResult
{
IsValid = false,
Entity = $"Cluster[{cluster.ClusterId}]/Destination[{dest.DestinationId}]",
Message = "Address 不能为空"
});
}
}
}
return results;
}
/// <summary>
/// 保存迁移报告
/// </summary>
private async Task SaveReportAsync(MigrationReport report, CancellationToken cancellationToken)
{
var reportPath = _options.ReportPath ??
Path.Combine(_options.OutputDir, $"migration-report-{DateTime.UtcNow:yyyyMMdd-HHmmss}.json");
var options = new JsonSerializerOptions
{
WriteIndented = true,
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping
};
var json = JsonSerializer.Serialize(report, options);
if (_options.DryRun)
{
_logger.LogInformation($"[DRY-RUN] 将生成报告: {reportPath}");
_logger.LogDebug($"报告内容:\n{json}");
}
else
{
await File.WriteAllTextAsync(reportPath, json, cancellationToken);
_logger.LogInformation($"已生成报告: {reportPath}");
}
}
/// <summary>
/// 掩盖连接字符串中的敏感信息
/// </summary>
private static string MaskConnectionString(string connectionString)
{
if (string.IsNullOrEmpty(connectionString))
return "[empty]";
try
{
var builder = new NpgsqlConnectionStringBuilder(connectionString);
if (!string.IsNullOrEmpty(builder.Password))
{
builder.Password = "***";
}
return builder.ToString();
}
catch
{
return "[invalid connection string]";
}
}
}
/// <summary>
/// 验证结果
/// </summary>
public class ValidationResult
{
public bool IsValid { get; set; }
public string Entity { get; set; } = string.Empty;
public string Message { get; set; } = string.Empty;
}