using Fengling.Member.Domain.Aggregates.PointsModel; using Fengling.Member.Domain.Events.Points; using Fengling.Member.Infrastructure; using Microsoft.Extensions.Logging; namespace Fengling.Member.Application.Services; /// /// 积分处理服务实现 /// 使用分布式锁 + Redis 缓存实现高性能积分处理 /// public class PointsProcessingService : IPointsProcessingService { private readonly IPointsAccountCache _cache; private readonly ICodeDistributedLock _lock; private readonly IPointsHistoryRepository _historyRepository; private readonly IMediator _mediator; private readonly ITenantAccessor _tenantAccessor; private readonly ILogger _logger; public PointsProcessingService( IPointsAccountCache cache, ICodeDistributedLock distributedLock, IPointsHistoryRepository historyRepository, IMediator mediator, ITenantAccessor tenantAccessor, ILogger logger) { _cache = cache; _lock = distributedLock; _historyRepository = historyRepository; _mediator = mediator; _tenantAccessor = tenantAccessor; _logger = logger; } public async Task ProcessAsync(PointsProcessRequest request, CancellationToken ct = default) { // 1. 幂等性检查 - 检查码是否已处理 if (await _cache.IsCodeProcessedAsync(request.CodeId)) { _logger.LogWarning("Code {CodeId} already processed", request.CodeId); return PointsProcessResult.Failed("码已处理", "CODE_ALREADY_PROCESSED"); } // 2. 获取分布式锁 var lockResult = await _lock.AcquireAsync(request.CodeId); if (!lockResult.Success) { _logger.LogWarning("Failed to acquire lock for code {CodeId}: {Error}", request.CodeId, lockResult.ErrorMessage); return PointsProcessResult.Failed("处理中,请稍后重试", "LOCK_FAILED"); } try { // 3. 双重检查幂等性(在获取锁后再次检查) if (await _cache.IsCodeProcessedAsync(request.CodeId)) { return PointsProcessResult.Failed("码已处理", "CODE_ALREADY_PROCESSED"); } // 4. 从缓存获取账户,不存在则创建 var account = await _cache.GetAsync(request.MemberId); var tenantId = _tenantAccessor.GetTenantId() ?? 0; if (account == null) { account = new PointsAccountCacheModel { MemberId = request.MemberId, TenantId = tenantId, TotalPoints = 0, FrozenPoints = 0 }; } // 5. 处理积分 int newTotal; if (request.IsAddition) { newTotal = await _cache.AddPointsAsync(request.MemberId, request.Points); } else { var deductResult = await _cache.DeductPointsAsync(request.MemberId, request.Points); if (!deductResult.Success) { return PointsProcessResult.Failed("积分不足", "INSUFFICIENT_POINTS"); } newTotal = deductResult.RemainingPoints; } // 6. 标记码已处理 await _cache.MarkCodeProcessedAsync(request.CodeId); // 7. 发布领域事件(异步持久化到数据库) var domainEvent = new PointsChangedEvent( 0, // AccountId - 缓存中暂无ID,事件消费者处理 request.MemberId, tenantId, request.IsAddition ? request.Points : -request.Points, newTotal, request.TransactionType, request.SourceId, request.Remark); await _mediator.Publish(domainEvent, ct); _logger.LogInformation( "Points processed successfully: MemberId={MemberId}, CodeId={CodeId}, Points={Points}, IsAddition={IsAddition}, NewTotal={Total}", request.MemberId, request.CodeId, request.Points, request.IsAddition, newTotal); return PointsProcessResult.Succeeded(request.Points, newTotal); } catch (Exception ex) { _logger.LogError(ex, "Error processing points for member {MemberId}, code {CodeId}", request.MemberId, request.CodeId); return PointsProcessResult.Failed("处理失败", "PROCESS_ERROR"); } finally { // 8. 释放锁 await _lock.ReleaseAsync(request.CodeId, lockResult.LockValue!); } } public async Task GetBalanceAsync(long memberId) { return await _cache.GetAsync(memberId); } }