- 在MarketingCode聚合中新增品类ID和品类名称字段,完善产品信息结构 - 迁移生成营销码命令,支持传入品类ID和品类名称参数 - 积分发放失败时发送积分获得失败通知集成事件 - 新增通知发送及积分失败通知的集成事件处理器,使用SSE推送通知 - 在积分相关集成事件处理器中添加发送积分变动通知功能 - 移除Notification聚合,相关数据库表删除 - 新增分页结果类型PagedResult,支持营销码查询分页返回 - 营销码查询支持分页参数,返回分页结果数据
238 lines
9.8 KiB
Python
238 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
CAP消息队列诊断测试
|
||
验证积分事件是否正确发布和消费
|
||
"""
|
||
|
||
import requests
|
||
import json
|
||
import time
|
||
from datetime import datetime
|
||
|
||
BASE_URL = "http://localhost:5511"
|
||
|
||
def log(msg, status="INFO"):
|
||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||
icons = {"PASS": "✅", "FAIL": "❌", "INFO": "ℹ️", "WARN": "⚠️"}
|
||
print(f"[{timestamp}] {icons.get(status, 'ℹ️')} {msg}")
|
||
|
||
def diagnose_cap_messaging():
|
||
"""诊断CAP消息队列问题"""
|
||
log("=== CAP消息队列诊断测试 ===", "INFO")
|
||
|
||
# 1. 登录获取管理员权限
|
||
log("1. 获取管理员权限...", "INFO")
|
||
admin_login_data = {
|
||
"email": "admin@example.com",
|
||
"password": "Admin123!"
|
||
}
|
||
|
||
try:
|
||
# 尝试管理员登录
|
||
admin_resp = requests.post(
|
||
f"{BASE_URL}/api/admins/login",
|
||
json=admin_login_data,
|
||
timeout=10
|
||
)
|
||
|
||
if admin_resp.status_code == 200:
|
||
admin_token = admin_resp.json()['data']['token']
|
||
log("✅ 管理员登录成功", "PASS")
|
||
else:
|
||
# 使用已知的管理员token
|
||
admin_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJodHRwOi8vc2NoZW1hcy54bWxzb2FwLm9yZy93cy8yMDA1LzA1L2lkZW50aXR5L2NsYWltcy9uYW1laWRlbnRpZmllciI6IjAxOWM0YWM5LTA5ZTgtNzFhOS04YzdmLWIwNDU5YjYwMjQ0MCIsImh0dHA6Ly9zY2hlbWFzLnhtbHNvYXAub3JnL3dzLzIwMDUvMDUvaWRlbnRpdHkvY2xhaW1zL25hbWUiOiJhZG1pbiIsImh0dHA6Ly9zY2hlbWFzLm1pY3Jvc29mdC5jb20vd3MvMjAwOC8wNi9pZGVudGl0eS9jbGFpbXMvcm9sZSI6IkFkbWluIiwiYWRtaW5faWQiOiIwMTljNGFjOS0wOWU4LTcxYTktOGM3Zi1iMDQ1OWI2MDI0NDAiLCJleHAiOjE3Nzg5OTg5OTUsImlzcyI6IkZlbmdsaW5nQmFja2VuZCIsImF1ZCI6IkZlbmdsaW5nQmFja2VuZCJ9.Mzk2Nzg4NTQy"
|
||
log("⚠️ 使用预设管理员token", "WARN")
|
||
|
||
admin_headers = {"Authorization": f"Bearer {admin_token}"}
|
||
|
||
# 2. 检查CAP Dashboard状态
|
||
log("2. 检查CAP Dashboard...", "INFO")
|
||
try:
|
||
cap_resp = requests.get(f"{BASE_URL}/cap", timeout=5)
|
||
if cap_resp.status_code == 200:
|
||
log("✅ CAP Dashboard可访问", "PASS")
|
||
else:
|
||
log(f"⚠️ CAP Dashboard返回: {cap_resp.status_code}", "WARN")
|
||
except Exception as e:
|
||
log(f"⚠️ CAP Dashboard不可访问: {e}", "WARN")
|
||
|
||
# 3. 检查Redis连接状态
|
||
log("3. 检查Redis配置...", "INFO")
|
||
try:
|
||
# 通过API检查Redis状态(如果有相关端点)
|
||
health_resp = requests.get(f"{BASE_URL}/health", timeout=5)
|
||
if health_resp.status_code == 200:
|
||
log("✅ 应用健康检查通过", "PASS")
|
||
else:
|
||
log(f"⚠️ 健康检查: {health_resp.status_code}", "WARN")
|
||
except Exception as e:
|
||
log(f"⚠️ 健康检查失败: {e}", "WARN")
|
||
|
||
# 4. 测试积分事件流程
|
||
log("4. 测试积分事件完整流程...", "INFO")
|
||
|
||
# 4.1 登录普通用户
|
||
user_login_data = {
|
||
"phone": "15921072307",
|
||
"password": "Sl52788542"
|
||
}
|
||
|
||
user_resp = requests.post(
|
||
f"{BASE_URL}/api/members/login",
|
||
json=user_login_data,
|
||
timeout=10
|
||
)
|
||
|
||
if user_resp.status_code == 200:
|
||
user_token = user_resp.json()['data']['token']
|
||
member_id = user_resp.json()['data']['memberId']
|
||
user_headers = {"Authorization": f"Bearer {user_token}"}
|
||
log(f"✅ 用户登录成功 - Member ID: {member_id}", "PASS")
|
||
else:
|
||
log(f"❌ 用户登录失败: {user_resp.status_code}", "FAIL")
|
||
return
|
||
|
||
# 4.2 获取可用营销码
|
||
log("4.2 获取可用营销码...", "INFO")
|
||
codes_resp = requests.get(
|
||
f"{BASE_URL}/api/admin/marketing-codes?batchNo=001&pageSize=3&pageNumber=1",
|
||
headers=admin_headers
|
||
)
|
||
|
||
if codes_resp.status_code == 200:
|
||
codes_data = codes_resp.json()
|
||
available_codes = [
|
||
item for item in codes_data.get('data', {}).get('items', [])
|
||
if not item.get('isUsed', True)
|
||
]
|
||
|
||
if available_codes:
|
||
test_code = available_codes[0]['code']
|
||
log(f"✅ 找到可用营销码: {test_code}", "PASS")
|
||
else:
|
||
log("❌ 没有可用的营销码", "FAIL")
|
||
return
|
||
else:
|
||
log(f"❌ 获取营销码失败: {codes_resp.status_code}", "FAIL")
|
||
return
|
||
|
||
# 4.3 执行扫码操作
|
||
log("4.3 执行扫码操作...", "INFO")
|
||
scan_data = {"code": test_code}
|
||
scan_resp = requests.post(
|
||
f"{BASE_URL}/api/marketing-codes/scan",
|
||
json=scan_data,
|
||
headers=user_headers,
|
||
timeout=10
|
||
)
|
||
|
||
if scan_resp.status_code == 200:
|
||
scan_result = scan_resp.json()
|
||
message = scan_result['data']['message']
|
||
log(f"✅ 扫码成功: {message}", "PASS")
|
||
else:
|
||
log(f"❌ 扫码失败: {scan_resp.status_code}", "FAIL")
|
||
return
|
||
|
||
# 4.4 等待并检查数据库中的积分交易记录
|
||
log("4.4 检查积分交易记录...", "INFO")
|
||
time.sleep(5) # 等待事件处理
|
||
|
||
# 这里需要一个API来查询积分交易记录
|
||
# 暂时跳过,直接测试通知
|
||
|
||
# 4.5 建立SSE连接测试通知接收
|
||
log("4.5 测试SSE通知接收...", "INFO")
|
||
notifications_received = []
|
||
|
||
try:
|
||
sse_resp = requests.get(
|
||
f"{BASE_URL}/api/notifications/sse",
|
||
headers=user_headers,
|
||
stream=True,
|
||
timeout=30
|
||
)
|
||
|
||
if sse_resp.status_code == 200:
|
||
log("✅ SSE连接建立成功", "PASS")
|
||
|
||
# 读取几条消息进行测试
|
||
message_count = 0
|
||
for line in sse_resp.iter_lines():
|
||
if line and message_count < 10: # 限制读取消息数量
|
||
try:
|
||
line_str = line.decode('utf-8')
|
||
if line_str.startswith('data: '):
|
||
data_str = line_str[6:]
|
||
if data_str.strip():
|
||
notification = json.loads(data_str)
|
||
notifications_received.append(notification)
|
||
notification_type = notification.get('type', 'unknown')
|
||
title = notification.get('title', '')
|
||
log(f" 收到通知 [{notification_type}]: {title}", "INFO")
|
||
message_count += 1
|
||
|
||
# 如果收到积分通知,测试成功
|
||
if '积分' in title or '积分' in notification.get('message', ''):
|
||
log("✅ 成功收到积分相关通知!", "PASS")
|
||
break
|
||
|
||
except Exception as e:
|
||
log(f" 解析消息失败: {e}", "WARN")
|
||
|
||
else:
|
||
log(f"❌ SSE连接失败: {sse_resp.status_code}", "FAIL")
|
||
|
||
except Exception as e:
|
||
log(f"❌ SSE测试异常: {e}", "FAIL")
|
||
|
||
# 5. 分析结果
|
||
log("5. 诊断结果分析...", "INFO")
|
||
|
||
# 检查是否收到连接和心跳消息
|
||
connection_msgs = [n for n in notifications_received if n.get('type') == 'connection']
|
||
heartbeat_msgs = [n for n in notifications_received if n.get('type') == 'heartbeat']
|
||
points_msgs = [n for n in notifications_received if '积分' in n.get('title', '') or '积分' in n.get('message', '')]
|
||
|
||
log(f" 连接消息: {len(connection_msgs)} 条", "INFO")
|
||
log(f" 心跳消息: {len(heartbeat_msgs)} 条", "INFO")
|
||
log(f" 积分消息: {len(points_msgs)} 条", "INFO")
|
||
|
||
# 诊断结论
|
||
log("=== 诊断结论 ===", "INFO")
|
||
|
||
if len(connection_msgs) > 0:
|
||
log("✅ SSE基础连接功能正常", "PASS")
|
||
else:
|
||
log("❌ SSE连接存在问题", "FAIL")
|
||
|
||
if len(heartbeat_msgs) > 0:
|
||
log("✅ 心跳机制正常工作", "PASS")
|
||
else:
|
||
log("⚠️ 心跳机制可能有问题", "WARN")
|
||
|
||
if len(points_msgs) > 0:
|
||
log("✅ 积分通知推送正常", "PASS")
|
||
log("🎉 CAP消息队列工作正常", "PASS")
|
||
else:
|
||
log("❌ 积分通知未推送 - CAP消息处理可能存在问题", "FAIL")
|
||
log(" 可能原因:", "INFO")
|
||
log(" 1. CAP订阅者未正确注册", "INFO")
|
||
log(" 2. Redis连接问题", "INFO")
|
||
log(" 3. 积分事件未正确发布", "INFO")
|
||
log(" 4. 通知事件处理器有问题", "INFO")
|
||
|
||
# 6. 建议的修复方向
|
||
log("=== 修复建议 ===", "INFO")
|
||
if len(points_msgs) == 0:
|
||
log("🔧 建议检查:", "INFO")
|
||
log(" 1. 确认PointsEarnedIntegrationEventHandler是否正确注册", "INFO")
|
||
log(" 2. 检查CAP Dashboard (/cap) 查看消息状态", "INFO")
|
||
log(" 3. 验证Redis连接是否正常", "INFO")
|
||
log(" 4. 检查SendNotificationIntegrationEvent是否被正确处理", "INFO")
|
||
|
||
except Exception as e:
|
||
log(f"❌ 诊断过程中出现异常: {e}", "FAIL")
|
||
|
||
if __name__ == "__main__":
|
||
diagnose_cap_messaging() |