- Implement RedisCounterService for rate limiting - Implement RuleLoader with timer refresh - Implement RiskEvaluator for local rule evaluation - Implement SamplingService for CAP events - Implement CapEventPublisher for async event publishing - Implement FailoverStrategy for Redis failure handling - Add configuration classes and DI extensions - Add unit tests (9 tests) - Add NuGet publishing script
132 lines
3.5 KiB
C#
132 lines
3.5 KiB
C#
using Fengling.RiskControl.Domain.Aggregates.RiskRules;
|
|
using Fengling.RiskControl.Configuration;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using StackExchange.Redis;
|
|
using System.Text.Json;
|
|
|
|
namespace Fengling.RiskControl.Rules;
|
|
|
|
public interface IRuleLoader
|
|
{
|
|
Task<List<RiskRule>> GetActiveRulesAsync();
|
|
Task RefreshRulesAsync();
|
|
event EventHandler? RulesChanged;
|
|
}
|
|
|
|
public class RedisRuleLoader : IRuleLoader, IHostedService
|
|
{
|
|
private readonly IConnectionMultiplexer _redis;
|
|
private readonly RiskControlClientOptions _options;
|
|
private readonly ILogger<RedisRuleLoader> _logger;
|
|
private List<RiskRule> _cachedRules = new();
|
|
private readonly object _lock = new();
|
|
private Timer? _refreshTimer;
|
|
|
|
private string RulesKey => $"{_options.Redis.KeyPrefix}rules:active";
|
|
|
|
public event EventHandler? RulesChanged;
|
|
|
|
public RedisRuleLoader(
|
|
IConnectionMultiplexer redis,
|
|
RiskControlClientOptions options,
|
|
ILogger<RedisRuleLoader> logger)
|
|
{
|
|
_redis = redis;
|
|
_options = options;
|
|
_logger = logger;
|
|
}
|
|
|
|
public Task StartAsync(CancellationToken cancellationToken)
|
|
{
|
|
_logger.LogInformation("RuleLoader starting, loading rules from Redis...");
|
|
|
|
_ = LoadRulesAsync();
|
|
|
|
_refreshTimer = new Timer(
|
|
_ => _ = RefreshRulesAsync(),
|
|
null,
|
|
TimeSpan.FromSeconds(_options.Evaluation.RuleRefreshIntervalSeconds),
|
|
TimeSpan.FromSeconds(_options.Evaluation.RuleRefreshIntervalSeconds)
|
|
);
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
_refreshTimer?.Dispose();
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public List<RiskRule> GetActiveRules()
|
|
{
|
|
lock (_lock)
|
|
{
|
|
return _cachedRules.ToList();
|
|
}
|
|
}
|
|
|
|
public async Task<List<RiskRule>> GetActiveRulesAsync()
|
|
{
|
|
if (_cachedRules.Count == 0)
|
|
{
|
|
await LoadRulesAsync();
|
|
}
|
|
return GetActiveRules();
|
|
}
|
|
|
|
public async Task RefreshRulesAsync()
|
|
{
|
|
try
|
|
{
|
|
await LoadRulesAsync();
|
|
_logger.LogDebug("Rules refreshed successfully, count: {Count}", _cachedRules.Count);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to refresh rules from Redis");
|
|
}
|
|
}
|
|
|
|
private async Task LoadRulesAsync()
|
|
{
|
|
var db = _redis.GetDatabase();
|
|
|
|
try
|
|
{
|
|
var values = await db.ListRangeAsync(RulesKey);
|
|
var rules = new List<RiskRule>();
|
|
|
|
foreach (var value in values)
|
|
{
|
|
if (value.HasValue)
|
|
{
|
|
var rule = JsonSerializer.Deserialize<RiskRule>(value!.ToString(), new JsonSerializerOptions
|
|
{
|
|
PropertyNameCaseInsensitive = true
|
|
});
|
|
if (rule != null && rule.IsActive)
|
|
{
|
|
rules.Add(rule);
|
|
}
|
|
}
|
|
}
|
|
|
|
rules = rules.OrderBy(r => r.Priority).ToList();
|
|
|
|
lock (_lock)
|
|
{
|
|
_cachedRules = rules;
|
|
}
|
|
|
|
RulesChanged?.Invoke(this, EventArgs.Empty);
|
|
}
|
|
catch (RedisException ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to load rules from Redis");
|
|
throw;
|
|
}
|
|
}
|
|
}
|