fengling-gateway/src/yarpgateway/Services/RedisConnectionManager.cs
movingsam da4f03502a refactor: reorganize project structure into yarpgateway folder
- Move YarpGateway and all source files to src/yarpgateway/
- Keep Fengling.Gateway.Plugin.Abstractions at src/ level
- Fix duplicate project reference in YarpGateway.slnx
- Update solution paths and test project references
- Add ProjectReference from YarpGateway to abstractions
2026-03-01 17:47:48 +08:00

140 lines
4.5 KiB
C#

using StackExchange.Redis;
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using YarpGateway.Config;
namespace YarpGateway.Services;
public interface IRedisConnectionManager
{
IConnectionMultiplexer GetConnection();
Task<IDisposable> AcquireLockAsync(string key, TimeSpan? expiry = null);
Task<T> ExecuteInLockAsync<T>(string key, Func<Task<T>> func, TimeSpan? expiry = null);
}
public class RedisConnectionManager : IRedisConnectionManager
{
private readonly Lazy<IConnectionMultiplexer> _lazyConnection;
private readonly RedisConfig _config;
private readonly ILogger<RedisConnectionManager> _logger;
public RedisConnectionManager(RedisConfig config, ILogger<RedisConnectionManager> logger)
{
_config = config;
_logger = logger;
_lazyConnection = new Lazy<IConnectionMultiplexer>(() =>
{
var configuration = ConfigurationOptions.Parse(_config.ConnectionString);
configuration.AbortOnConnectFail = false;
configuration.ConnectRetry = 3;
configuration.ConnectTimeout = 5000;
configuration.SyncTimeout = 3000;
configuration.DefaultDatabase = _config.Database;
var connection = ConnectionMultiplexer.Connect(configuration);
connection.ConnectionRestored += (sender, e) =>
{
_logger.LogInformation("Redis connection restored");
};
connection.ConnectionFailed += (sender, e) =>
{
_logger.LogError(e.Exception, "Redis connection failed");
};
// 脱敏连接字符串中的密码
_logger.LogInformation("Connected to Redis at {Host}", configuration.EndPoints.FirstOrDefault()?.ToString() ?? "unknown");
return connection;
});
}
public IConnectionMultiplexer GetConnection()
{
return _lazyConnection.Value;
}
public async Task<IDisposable> AcquireLockAsync(string key, TimeSpan? expiry = null)
{
var expiryTime = expiry ?? TimeSpan.FromSeconds(10);
var redis = GetConnection();
var db = redis.GetDatabase();
var lockKey = $"lock:{_config.InstanceName}:{key}";
var lockValue = Environment.MachineName + ":" + Process.GetCurrentProcess().Id;
var acquired = await db.StringSetAsync(lockKey, lockValue, expiryTime, When.NotExists);
if (!acquired)
{
var backoff = TimeSpan.FromMilliseconds(100);
var retryCount = 0;
const int maxRetries = 50;
while (!acquired && retryCount < maxRetries)
{
await Task.Delay(backoff);
acquired = await db.StringSetAsync(lockKey, lockValue, expiryTime, When.NotExists);
retryCount++;
if (retryCount < 10)
{
backoff = TimeSpan.FromMilliseconds(100 * (retryCount + 1));
}
}
if (!acquired)
{
throw new TimeoutException($"Failed to acquire lock for key: {lockKey}");
}
}
return new RedisLock(db, lockKey, lockValue, _logger);
}
public async Task<T> ExecuteInLockAsync<T>(string key, Func<Task<T>> func, TimeSpan? expiry = null)
{
using var @lock = await AcquireLockAsync(key, expiry);
return await func();
}
private class RedisLock : IDisposable
{
private readonly IDatabase _db;
private readonly string _key;
private readonly string _value;
private readonly ILogger _logger;
private bool _disposed;
public RedisLock(IDatabase db, string key, string value, ILogger logger)
{
_db = db;
_key = key;
_value = value;
_logger = logger;
}
public void Dispose()
{
if (_disposed) return;
try
{
var script = @"
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end";
_db.ScriptEvaluate(script, new RedisKey[] { _key }, new RedisValue[] { _value });
_logger.LogDebug("Released lock for key: {Key}", _key);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to release lock for key: {Key}", _key);
}
finally
{
_disposed = true;
}
}
}
}