13 KiB
YARP 网关外部集成文档
1. PostgreSQL 数据库集成
概述
PostgreSQL 作为主数据库,存储网关配置数据,包括租户、路由、服务实例等信息。
连接配置
配置位置: src/appsettings.json
{
"ConnectionStrings": {
"DefaultConnection": "Host=81.68.223.70;Port=15432;Database=fengling_gateway;Username=movingsam;Password=***"
}
}
DbContext 配置
文件: src/Data/GatewayDbContext.cs
// 注册 DbContext 工厂
builder.Services.AddDbContextFactory<GatewayDbContext>(options =>
options.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"))
);
数据模型
| 实体 | 表名 | 用途 |
|---|---|---|
GwTenant |
Tenants | 租户信息 |
GwTenantRoute |
TenantRoutes | 租户路由配置 |
GwServiceInstance |
ServiceInstances | 服务实例(集群节点) |
GwPendingServiceDiscovery |
PendingServiceDiscoveries | K8s 待处理服务发现 |
配置变更通知机制
文件: src/Config/ConfigNotifyChannel.cs
使用 PostgreSQL LISTEN/NOTIFY 机制实现配置变更实时通知:
// 发送通知(在 DbContext.SaveChangesAsync 中触发)
await using var cmd = new NpgsqlCommand($"NOTIFY {ConfigNotifyChannel.GatewayConfigChanged}", connection);
// 监听通知(在 PgSqlConfigChangeListener 中)
cmd.CommandText = $"LISTEN {ConfigNotifyChannel.GatewayConfigChanged}";
监听服务: src/Services/PgSqlConfigChangeListener.cs
- 监听 PostgreSQL NOTIFY 通道
- 检测配置版本变更
- 触发路由/集群配置热更新
- 提供 5 分钟兜底轮询机制
2. Redis 集成
概述
Redis 用于分布式锁、路由缓存同步,确保多实例网关的配置一致性。
连接配置
配置位置: src/Config/RedisConfig.cs
public class RedisConfig
{
public string ConnectionString { get; set; } = "81.68.223.70:16379,password=***";
public int Database { get; set; } = 0;
public string InstanceName { get; set; } = "YarpGateway";
}
连接管理器
文件: src/Services/RedisConnectionManager.cs
// 注册 Redis 连接
builder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
{
var config = sp.GetRequiredService<RedisConfig>();
var connectionOptions = ConfigurationOptions.Parse(config.ConnectionString);
connectionOptions.AbortOnConnectFail = false;
connectionOptions.ConnectRetry = 3;
connectionOptions.ConnectTimeout = 5000;
connectionOptions.SyncTimeout = 3000;
connectionOptions.DefaultDatabase = config.Database;
return ConnectionMultiplexer.Connect(connectionOptions);
});
分布式锁实现
接口: IRedisConnectionManager
public interface IRedisConnectionManager
{
IConnectionMultiplexer GetConnection();
Task<IDisposable> AcquireLockAsync(string key, TimeSpan? expiry = null);
Task<T> ExecuteInLockAsync<T>(string key, Func<Task<T>> func, TimeSpan? expiry = null);
}
锁机制特性:
- 基于键值对的分布式锁
- 自动过期时间(默认 10 秒)
- 指数退避重试策略
- Lua 脚本安全释放锁
3. Kubernetes 服务发现集成
概述
通过自定义的 Fengling.ServiceDiscovery 包实现 Kubernetes 服务自动发现,将 K8s Service 自动注册为网关后端服务。
配置
文件: src/Program.cs
// 添加 Kubernetes 服务发现
var useInClusterConfig = builder.Configuration.GetValue<bool>("ServiceDiscovery:UseInClusterConfig", true);
builder.Services.AddKubernetesServiceDiscovery(options =>
{
options.LabelSelector = "app.kubernetes.io/managed-by=yarp";
options.UseInClusterConfig = useInClusterConfig;
});
builder.Services.AddServiceDiscovery();
依赖包
| 包名 | 用途 |
|---|---|
Fengling.ServiceDiscovery.Core |
服务发现核心接口 |
Fengling.ServiceDiscovery.Kubernetes |
Kubernetes 实现 |
Fengling.ServiceDiscovery.Static |
静态配置实现 |
后台同步服务
文件: src/Services/KubernetesPendingSyncService.cs
public class KubernetesPendingSyncService : BackgroundService
{
private readonly TimeSpan _syncInterval = TimeSpan.FromSeconds(30);
private readonly TimeSpan _staleThreshold = TimeSpan.FromHours(24);
// 同步 K8s 服务到数据库待处理表
}
同步逻辑:
- 每 30 秒从 K8s API 获取服务列表
- 对比数据库中的待处理服务记录
- 新增/更新/清理过期服务
- 标记不再存在的 K8s 服务
待处理服务数据模型
文件: src/Models/GwPendingServiceDiscovery.cs
public class GwPendingServiceDiscovery
{
public string K8sServiceName { get; set; } // K8s Service 名称
public string K8sNamespace { get; set; } // K8s 命名空间
public string K8sClusterIP { get; set; } // ClusterIP
public string DiscoveredPorts { get; set; } // JSON 序列化的端口列表
public string Labels { get; set; } // K8s 标签
public string AssignedClusterId { get; set; } // 分配的集群 ID
public int Status { get; set; } // 状态
}
4. JWT 认证集成
概述
网关解析 JWT Token,提取租户和用户信息,转换为下游服务可用的 HTTP 头。
配置
文件: src/Config/JwtConfig.cs
public class JwtConfig
{
public string Authority { get; set; } = string.Empty; // 认证服务器地址
public string Audience { get; set; } = string.Empty; // 受众
public bool ValidateIssuer { get; set; } = true; // 验证签发者
public bool ValidateAudience { get; set; } = true; // 验证受众
}
配置示例 (src/appsettings.json):
{
"Jwt": {
"Authority": "https://your-auth-server.com",
"Audience": "fengling-gateway",
"ValidateIssuer": true,
"ValidateAudience": true
}
}
JWT 转换中间件
文件: src/Middleware/JwtTransformMiddleware.cs
public async Task InvokeAsync(HttpContext context)
{
var authHeader = context.Request.Headers["Authorization"].FirstOrDefault();
if (!string.IsNullOrEmpty(authHeader) && authHeader.StartsWith("Bearer "))
{
var token = authHeader.Substring("Bearer ".Length).Trim();
var jwtToken = jwtHandler.ReadJwtToken(token);
// 提取声明并转换为 HTTP 头
var tenantId = jwtToken.Claims.FirstOrDefault(c => c.Type == "tenant")?.Value;
var userId = jwtToken.Claims.FirstOrDefault(c => c.Type == ClaimTypes.NameIdentifier)?.Value;
context.Request.Headers["X-Tenant-Id"] = tenantId;
context.Request.Headers["X-User-Id"] = userId;
context.Request.Headers["X-User-Name"] = userName;
context.Request.Headers["X-Roles"] = string.Join(",", roles);
}
await _next(context);
}
JWT 声明到 HTTP 头映射
| JWT 声明类型 | HTTP 头 | 说明 |
|---|---|---|
tenant |
X-Tenant-Id |
租户标识 |
ClaimTypes.NameIdentifier |
X-User-Id |
用户 ID |
ClaimTypes.Name |
X-User-Name |
用户名 |
ClaimTypes.Role |
X-Roles |
角色列表(逗号分隔) |
5. 外部 API 和服务连接
CORS 配置
文件: src/appsettings.json
{
"Cors": {
"AllowedOrigins": [
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://localhost:5174"
],
"AllowAnyOrigin": false
}
}
健康检查端点
文件: src/Program.cs
app.MapGet("/health", () => Results.Ok(new {
status = "healthy",
timestamp = DateTime.UtcNow
}));
下游服务健康检查
文件: src/Config/DatabaseClusterConfigProvider.cs
HealthCheck = new HealthCheckConfig
{
Active = new ActiveHealthCheckConfig
{
Enabled = true,
Interval = TimeSpan.FromSeconds(30),
Timeout = TimeSpan.FromSeconds(5),
Path = "/health"
}
}
动态代理配置
文件: src/DynamicProxy/DynamicProxyConfigProvider.cs
实现 IProxyConfigProvider 接口,从数据库动态加载路由和集群配置:
public class DynamicProxyConfigProvider : IProxyConfigProvider
{
public IProxyConfig GetConfig() => _config;
public void UpdateConfig()
{
var routes = _routeProvider.GetRoutes();
var clusters = _clusterProvider.GetClusters();
_config = new InMemoryProxyConfig(routes, clusters, ...);
}
}
6. 集成架构图
┌─────────────────────────────────────────────────────────────┐
│ 客户端请求 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ YARP Gateway │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 中间件管道 │ │
│ │ CORS → JWT转换 → 租户路由 → Controllers → Proxy │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────────────┐ │
│ │RouteCache│ │ConfigProv│ │LoadBalancingPolicy │ │
│ └────┬─────┘ └────┬─────┘ └──────────────────────────┘ │
└───────┼─────────────┼────────────────────────────────────────┘
│ │
┌───────────────┼─────────────┼───────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌───────────────┐ ┌───────────┐ ┌───────────┐ ┌───────────────────┐
│ PostgreSQL │ │ Redis │ │ K8s │ │ Auth Server │
│ │ │ │ │ API │ │ (JWT) │
│ - 租户配置 │ │ - 分布式锁│ │ │ │ │
│ - 路由配置 │ │ - 缓存 │ │ - Service │ │ - Token 签发 │
│ - 服务实例 │ │ │ │ - Pod │ │ - 声明信息 │
│ - NOTIFY机制 │ │ │ │ │ │ │
└───────────────┘ └───────────┘ └───────────┘ └───────────────────┘
│
│ LISTEN/NOTIFY
▼
┌───────────────────────────────────────────────────────┐
│ 配置变更监听器 │
│ PgSqlConfigChangeListener + FallbackPolling │
└───────────────────────────────────────────────────────┘
7. 配置热更新流程
数据库配置变更
│
▼
DbContext.SaveChangesAsync()
│
▼
NOTIFY gateway_config_changed
│
▼
PgSqlConfigChangeListener.OnNotification()
│
▼
RouteCache.ReloadAsync()
│
▼
DynamicProxyConfigProvider.UpdateConfig()
│
▼
YARP 配置生效(无需重启)
兜底机制: 每 5 分钟检查版本号,防止 NOTIFY 丢失导致配置不一致。