IMPL-6: 集成 ConfigNotificationService 通知网关
All checks were successful
Build and Push Docker / build (push) Successful in 2m39s
All checks were successful
Build and Push Docker / build (push) Successful in 2m39s
- 在 Confirm 方法中添加事务包装,确保数据库写入的原子性 - 注入 ConsoleDbContext 用于事务管理 - 添加详细的日志记录: - 配置确认开始/结束 - 数据库写入成功 - 事务提交/回滚 - NOTIFY 发送成功/失败 - 缓存移除成功/失败 - 事务处理逻辑:先提交数据库事务,再发送 NOTIFY - NOTIFY 失败不影响数据一致性(仅记录错误)
This commit is contained in:
parent
83085c6dea
commit
5dc9dc5979
@ -1,10 +1,12 @@
|
|||||||
using System.Security.Claims;
|
using System.Security.Claims;
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using System.Text.Json.Nodes;
|
using System.Text.Json.Nodes;
|
||||||
|
using Fengling.Console.Data;
|
||||||
using Fengling.Console.Models.Dtos;
|
using Fengling.Console.Models.Dtos;
|
||||||
using Fengling.Console.Models.Entities;
|
using Fengling.Console.Models.Entities;
|
||||||
using Fengling.Console.Models.K8s;
|
using Fengling.Console.Models.K8s;
|
||||||
using Fengling.Platform.Domain.AggregatesModel.GatewayAggregate;
|
using Fengling.Platform.Domain.AggregatesModel.GatewayAggregate;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
namespace Fengling.Console.Controllers;
|
namespace Fengling.Console.Controllers;
|
||||||
|
|
||||||
@ -21,6 +23,7 @@ public class PendingConfigController : ControllerBase
|
|||||||
private readonly INotificationService _notificationService;
|
private readonly INotificationService _notificationService;
|
||||||
private readonly IClusterStore _clusterStore;
|
private readonly IClusterStore _clusterStore;
|
||||||
private readonly IRouteStore _routeStore;
|
private readonly IRouteStore _routeStore;
|
||||||
|
private readonly ConsoleDbContext _dbContext;
|
||||||
private readonly ILogger<PendingConfigController> _logger;
|
private readonly ILogger<PendingConfigController> _logger;
|
||||||
|
|
||||||
public PendingConfigController(
|
public PendingConfigController(
|
||||||
@ -28,12 +31,14 @@ public class PendingConfigController : ControllerBase
|
|||||||
INotificationService notificationService,
|
INotificationService notificationService,
|
||||||
IClusterStore clusterStore,
|
IClusterStore clusterStore,
|
||||||
IRouteStore routeStore,
|
IRouteStore routeStore,
|
||||||
|
ConsoleDbContext dbContext,
|
||||||
ILogger<PendingConfigController> logger)
|
ILogger<PendingConfigController> logger)
|
||||||
{
|
{
|
||||||
_pendingConfigCache = pendingConfigCache;
|
_pendingConfigCache = pendingConfigCache;
|
||||||
_notificationService = notificationService;
|
_notificationService = notificationService;
|
||||||
_clusterStore = clusterStore;
|
_clusterStore = clusterStore;
|
||||||
_routeStore = routeStore;
|
_routeStore = routeStore;
|
||||||
|
_dbContext = dbContext;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -188,11 +193,14 @@ public class PendingConfigController : ControllerBase
|
|||||||
[ProducesResponseType(typeof(object), StatusCodes.Status500InternalServerError)]
|
[ProducesResponseType(typeof(object), StatusCodes.Status500InternalServerError)]
|
||||||
public async Task<ActionResult> Confirm(string id, [FromBody] ConfirmPendingConfigRequest? request)
|
public async Task<ActionResult> Confirm(string id, [FromBody] ConfirmPendingConfigRequest? request)
|
||||||
{
|
{
|
||||||
|
_logger.LogInformation("Starting config confirmation process for ConfigId: {ConfigId}", id);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var config = FindConfigById(id);
|
var config = FindConfigById(id);
|
||||||
if (config == null)
|
if (config == null)
|
||||||
{
|
{
|
||||||
|
_logger.LogWarning("Pending config not found: {ConfigId}", id);
|
||||||
return NotFound(new { message = $"Pending config {id} not found" });
|
return NotFound(new { message = $"Pending config {id} not found" });
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,27 +212,62 @@ public class PendingConfigController : ControllerBase
|
|||||||
|
|
||||||
if (configModel == null)
|
if (configModel == null)
|
||||||
{
|
{
|
||||||
|
_logger.LogError("Invalid config format for ConfigId: {ConfigId}", id);
|
||||||
return BadRequest(new { message = "Invalid config format" });
|
return BadRequest(new { message = "Invalid config format" });
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取当前用户
|
// 获取当前用户
|
||||||
var confirmedBy = User.FindFirst(ClaimTypes.Name)?.Value ?? User.Identity?.Name ?? "system";
|
var confirmedBy = User.FindFirst(ClaimTypes.Name)?.Value ?? User.Identity?.Name ?? "system";
|
||||||
|
_logger.LogInformation("Config {ConfigId} confirmation initiated by user: {ConfirmedBy}", id, confirmedBy);
|
||||||
|
|
||||||
// 写入数据库
|
// 使用事务包装数据库写入和通知
|
||||||
await SaveConfigToDatabaseAsync(configModel);
|
await using var transaction = await _dbContext.Database.BeginTransactionAsync();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// 写入数据库
|
||||||
|
await SaveConfigToDatabaseAsync(configModel);
|
||||||
|
_logger.LogInformation("Database write completed successfully for ConfigId: {ConfigId}", id);
|
||||||
|
|
||||||
// 发送 PostgreSQL NOTIFY
|
// 提交事务
|
||||||
await _notificationService.PublishConfigChangeAsync(
|
await transaction.CommitAsync();
|
||||||
"route",
|
_logger.LogInformation("Database transaction committed for ConfigId: {ConfigId}", id);
|
||||||
"create",
|
}
|
||||||
new { configId = id, serviceName = config.ServiceName, clusterId = config.ClusterId }
|
catch (Exception ex)
|
||||||
);
|
{
|
||||||
|
await transaction.RollbackAsync();
|
||||||
|
_logger.LogError(ex, "Database transaction failed and rolled back for ConfigId: {ConfigId}", id);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送 PostgreSQL NOTIFY(在事务外,因为 NOTIFY 使用独立连接)
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _notificationService.PublishConfigChangeAsync(
|
||||||
|
"route",
|
||||||
|
"create",
|
||||||
|
new { configId = id, serviceName = config.ServiceName, clusterId = config.ClusterId }
|
||||||
|
);
|
||||||
|
_logger.LogInformation("NOTIFY sent successfully for ConfigId: {ConfigId}, Channel: gateway_config_changed", id);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// NOTIFY 失败不应影响已提交的数据,仅记录错误
|
||||||
|
_logger.LogError(ex, "Failed to send NOTIFY for ConfigId: {ConfigId}, but database changes are already committed", id);
|
||||||
|
}
|
||||||
|
|
||||||
// 从缓存中移除
|
// 从缓存中移除
|
||||||
_pendingConfigCache.Remove(config.SourceId);
|
var removed = _pendingConfigCache.Remove(config.SourceId);
|
||||||
|
if (removed)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("Config removed from cache successfully: {ConfigId}, SourceId: {SourceId}", id, config.SourceId);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Failed to remove config from cache: {ConfigId}, SourceId: {SourceId}", id, config.SourceId);
|
||||||
|
}
|
||||||
|
|
||||||
_logger.LogInformation(
|
_logger.LogInformation(
|
||||||
"Pending config {ConfigId} confirmed by {ConfirmedBy}. Service: {ServiceName}",
|
"Pending config {ConfigId} confirmed successfully by {ConfirmedBy}. Service: {ServiceName}",
|
||||||
id, confirmedBy, config.ServiceName);
|
id, confirmedBy, config.ServiceName);
|
||||||
|
|
||||||
return Ok(new { message = "Config confirmed successfully", configId = id });
|
return Ok(new { message = "Config confirmed successfully", configId = id });
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user